#!/usr/bin/python
# -*- coding: cp936 -*-
#################################
# Written by caocao #
#################################
import sys, string, os
from types import *
import MySQLdb
print "Written by caocao"
print "caocao@eastday.com"
print "http://nethermit.yeah.net"
def iif(expression, whenTrue, whenFalse):
if expression:
return whenTrue
else:
return whenFalse
class mysqlTest:
def __init__(self, host="localhost", user="root", passwd="", db=""):
self.connection=None
self.host=host
self.user=user
self.passwd=passwd
self.db=db
self.result=None
print "-"*40
print "MySQL Shell v 1.0"
print "Usage: python mysql.shell.py [host] [user] [passwd(% is empty)] [db]"
print "Connect..."
try:
self.connection=MySQLdb.connect(host=self.host, user=self.user, passwd=self.passwd, db=self.db)
except:
print "Can't connect to mysql server.\nPlease make sure your username or password is right."
sys.exit(1)
print "-"*40
print self.printComment("connection", "get_server_info")
print self.printComment("connection", "get_host_info")
print self.printComment("connection", "get_proto_info")
print self.printComment("connection", "info")
print self.printComment("connection", "character_set_name")
print self.printComment("connection", "thread_id")
print self.printComment("connection", "stat")
def __del__(self):
if self.connection!=None:
self.connection.close()
print "-"*40
print "Quit..."
def printComment(self, instance, function):
return "%s = %s" % (string.rjust(function, 18), eval("self."+instance+"."+function+"()"))
def printAll(self):
output, row="", self.result.fetch_row(0)
for i in range(self.result.num_fields()):
output+=repr(self.result.describe()[i][0])+"\n"
for i in range(self.result.num_rows()):
for j in range(self.result.num_fields()):
output+=iif(type(row[i][j]) is StringType, row[i][j], repr(row[i][j]))+"\n"
return output
def runSQL(self, queryString="show databases"):
print "-"*40
try:
self.connection.query(queryString)
except:
print "Can't run sql."
else:
self.result=self.connection.store_result()
print self.printComment("connection", "field_count")
print self.printComment("connection", "affected_rows")
print self.printComment("connection", "insert_id")
print self.printComment("result", "num_fields")
print self.printComment("result", "num_rows")
print self.printComment("result", "field_flags")
print "-"*40
print self.printAll()
if __name__=="__main__":
argArray=sys.argv
del argArray[0]
test=eval(("mysqlTest(\"%s\")" % "\",\"".join(argArray)).replace("%", ""))
while True:
try:
command=string.strip(raw_input("PS mysql>"), " ")
commandLow=string.lower(command)
except EOFError:
break
else:
if commandLow=="exit" or commandLow=="quit":
break
elif commandLow=="":
continue
else:
test.runSQL(command)