車輪の再発明

いったい何番煎じだよ!

sqlplusが個人的に使いにくいのでpythonでそれっぽいのを書いてみる

Oracle標準のsqlplusですが、個人的にはちょっと使いにくいのでpyhtonで書いてみました。

もちろんわざわざ作らなくてもその辺にツールはいくらでも転がっているんですが、ダウンロードさせてもらえないんです。いやさせてもらえないことはないけど、使いにくいくらいの理由じゃあねえ、、、。客先常駐あるあるだと思う。

ないなら書いてしまえというわけで、、、。

欲しい機能

  • bashみたいにバックスペースで文字を削除したい。
  • bashみたいにカーソルキーでカーソルを移動したい。⇒これできないのがすごく面倒。
  • bashみたいに↑キーで履歴がでるようにしたい。
  • csvでファイルに出力したい。⇒||','||とかでできなくはないんだけど、SQLが気持ち悪くなるし、エスケープしてくれないし、、、

環境

  • python2.7 ←CentOS7.0でフォルトだから入ってる。
  • cx_Oracle 5.3←これだけは別の仕事でどうしても必要だったので入れてもらえた。

使いかた

$ ./oracle_cli.py
Help: ?

> ?

Documented commands (type help <topic>):
========================================
EOF        connect  export_csv  header  query
ask_fetch  exit     fetch_rows  help    quit

>
> connect admin@localhost:1521/PDB01
Password:
Conneted to Oracle successfly.

> query SELECT 'hello!' AS HELLO FROM DUAL
HELLO
----------
hello!

Query done successfly. 1 row(s) selected.

>

sqlplusっぽくインタラクティブに操作できる。

helpも一通り書いたので、所見でも困らない、、、はず。

コード

#!/usr/bin/python

import cx_Oracle
import sys
import re
import os
import csv
from getpass import getpass
from cmd import Cmd

class Oracle_cli(Cmd):
    prompt = "> "
    intro = "Help: ?\n"

    def __init__(self):
        Cmd.__init__(self)
        self.isHeader = True
        self.isConnected = False
        self.isCsv = False
        self.isAskContDef = True

    def do_connect(self, arg):
        if self.isConnected:
            print 'Already connecting.\n'
            return False

        if not re.match(r'[^/]+/[^@]+@',arg):
            passwd = getpass('Password: ')
            arg = arg.replace('@','/'+ passwd +'@')
        try:
            self.connection = cx_Oracle.connect(arg)
            self.cursor = self.connection.cursor()
        except cx_Oracle.DatabaseError, cx_msg:
            print 'Faild to connect.'
            print "%s\n" %cx_msg
            return False

        self.isConnected = True
        print 'Conneted to Oracle successfly.\n'

    def help_connect(self):
        print "Connect to oracle."
        print "Usage: connect UserName[/Password]@HostName:Port/ServiceName\n"

    def do_query(self, arg):
        if not self.isConnected:
            print 'No connection. Execute connect command at first.\n'
            return False

        try:
            self.cursor.execute(arg)
        except cx_Oracle.DatabaseError, cx_msg:
            print 'Faild to query.'
            print "%s\n" %cx_msg
            return False

        self.dscr = self.cursor.description
        if self.dscr:
            if self.isHeader:
                self.header = [self.dscr[i][0] for i in xrange(len(self.dscr))]
                print '\t'.join(self.header)
                print '----------'
                if self.isCsv:
                    self.writer.writerow(self.header)

            self.isCont = True
            self.isFirstLoop = True
            self.isAskCont = self.isAskContDef
            while self.isCont:
                try:
                    self.rows = self.cursor.fetchmany(1)
                except cx_Oracle.DatabaseError, cx_msg:
                    print 'Faild to fetch rows.'
                    print "%s\n" %cx_msg
                    return False
                if not self.rows:
                    break

                if self.isAskCont and not self.isFirstLoop:
                    while True:
                        self.readed = raw_input('Fetch more %s rows ? ([y]es/[n]o/[d]o not ask anymore) ' %self.cursor.arraysize)
                        if self.readed == 'y':
                            break
                        elif self.readed == 'n':
                            self.isCont = False
                            print '\nQuery done successfly. %s row(s) selected.\n' %str(self.cursor.rowcount -1)
                            return False
                        elif self.readed == 'd':
                            self.isAskCont = False
                            break
                        else:
                            print 'Invalid input.'

                self.isFirstLoop = False

                try:
                    self.rows += self.cursor.fetchmany(self.cursor.arraysize -1)
                except cx_Oracle.DatabaseError, cx_msg:
                    print 'Faild to fetch rows.'
                    print "%s\n" %cx_msg
                    return False

                for self.row in self.rows:
                    print '\t'.join([str(i) for i in self.row])
                if self.isCsv:
                    self.writer.writerows(self.rows)

            print '\nQuery done successfly. %s row(s) selected.\n' %self.cursor.rowcount

        else:
            print 'Query done successfly. %s row(s) affected.\n' %self.cursor.rowcount

    def help_query(self):
        print 'Execute query.'
        print 'Usage: query <SQL>\n'

    def do_export_csv(self, arg):
        if arg == 'off':
           self.isCsv = False
           print 'Do not export csv.'
        elif re.match(r'^on +[^ ]+ *$',arg):
            self.csv = re.sub(r'^on +','',arg)
            if os.path.exists(self.csv):
                while True:
                    self.readed = raw_input('%s already exist. Over write ? ([y]es/[n]o) ' %self.csv)
                    if self.readed == 'y':
                        try:
                            self.f = open(self.csv,'w')
                        except:
                            print 'Can not create "%s".\n' %self.csv
                            return False
                        self.writer = csv.writer(self.f,lineterminator='\n')
                        self.isCsv = True
                        print 'Export csv file.\n'
                        return False
                    elif self.readed == 'n':
                        return False
                    else:
                        print 'Invalid input.'
        else:
            print 'Invalid argument.\n'

    def help_export_csv(self):
        print 'Eport csv file.'
        print 'Usage: export_csv on </path/of/csv>|off\n'

    def do_ask_fetch(self, arg):
        if arg == 'on':
            self.isAskContDef = True
            print 'Ask whether to continue fetching every time.\n'

        elif arg == 'off':
            self.isAskContDef = False
            print 'Do not ask whether to continue fetching.\n'

        else:
            print 'Invalid argument.\n'

    def help_ask_fetch(self):
        print 'Ask whether to continue fetching every time.'
        print 'Usage: ask_fetch on|off\n'

    def do_header(self, arg):
        if arg == 'on':
            self.isHeader = True
            print 'Header on.\n'

        elif arg == 'off':
            self.isHeader = False
            print 'Header off.\n'

        else:
            print 'Invalid argument.\n'

    def help_header(self):
        print 'Show column names as a header.'
        print 'Usage: header on|off\n'

    def do_fetch_rows(self, arg):
        if not self.isConnected:
            print 'No connection. Execute connect command at first.\n'
            return False
        try:
            self.cursor.arraysize = int(arg)
        except:
            print 'Invalid argument.\n'
            return False
        print '%s row(s) fetch at a time.\n' %self.cursor.arraysize

    def help_fetch_rows(self):
        print 'The number of rows to fetch at a time.'
        print 'Usage: fetch_rows <integer>\n'

    def do_exit(self, arg):
        return True

    def help_exit(self):
        print "Exit here.\n"

    def do_quit(self, arg):
        return True

    def help_quit(self):
        print "Exit here.\n"

    def do_EOF(self, arg):
        return True

    def help_EOF(self):
        print "Exit here.\n"

    def emptyline(self):
        pass


if __name__ == '__main__':
    cli=Oracle_cli()

    args = sys.argv
    if args:
        for i in xrange(len(args)):
            if args[i] == '-c':
                i += 1
                cli.do_connect(args[i])
            if args[i] == '-h':
                print 'Usage: [-c UserName[/Password]@HostName:Port/ServiceName|-h]'
                exit()

    cli.cmdloop()