1. 程式人生 > 資料庫 >Python帶動態引數功能的sqlite工具類

Python帶動態引數功能的sqlite工具類

本文例項講述了Python帶動態引數功能的sqlite工具類。分享給大家供大家參考,具體如下:

最近在弄sqlite和python

在網上參考各教程後,結合以往java jdbc資料庫工具類寫出以下python連線sqlite的工具類

寫得比較繁瑣 主要是想保留一種類似java的Object…args動態引數寫法 併兼容陣列/list方式傳遞不定個數引數 並且返回值是List形式 dict字典 以便和JSON格式互相轉換

在python中有一些區別 經過該工具類封裝之後可以有以下用法:

db.executeQuery("s * f t w id=? and name=?","id01","name01");//動態引數形式
db.executeQuery("s * f t w id=? and name=?",("id01","name01"));//tuple元組式 等價上面 括號可省略
db.executeQuery("s * f t w id=? and name=?",["id01","name01"]);//list陣列形式

完整Python程式碼如下:

#!/usr/bin/python
#-*- coding:utf-8 -*-  
import sqlite3
import os 
#
# 連線資料庫幫助類
# eg:
#  db = database()
#  count,listRes = db.executeQueryPage("select * from student where id=? and name like ? ",2,10,"%name%")
#  listRes = db.executeQuery("select * from student where id=? and name like ? ","%name%")
#  db.execute("delete from student where id=? ","id01")
#  count = db.getCount("select * from student ")
#  db.close()
#
class database :
  dbfile = "sqlite.db"
  memory = ":memory:"
  conn = None
  showsql = True
  def __init__(self):
    self.conn = self.getConn()
  #輸出工具
  def out(self,outStr,*args):
    if(self.showsql):
      for var in args:
        if(var):
          outStr = outStr + "," + str(var)
      print("db. " + outStr)
    return 
  #獲取連線
  def getConn(self):
    if(self.conn is None):
      conn = sqlite3.connect(self.dbfile)
      if(conn is None):
        conn = sqlite3.connect(self.memory)
      if(conn is None):
        print("dbfile : " + self.dbfile + " is not found && the memory connect error ! ")
      else:
        conn.row_factory = self.dict_factory #字典解決方案
        self.conn = conn
      self.out("db init conn ok ! ")
    else:
      conn = self.conn
    return conn
  #字典解決方案
  def dict_factory(self,cursor,row): 
    d = {} 
    for idx,col in enumerate(cursor.description): 
      d[col[0]] = row[idx] 
    return d
  #關閉連線
  def close(self,conn=None):
    res = 2
    if(not conn is None):
      conn.close()
      res = res - 1
    if(not self.conn is None):
      self.conn.close()
      res = res - 1
    self.out("db close res : " + str(res))
    return res
  #加工引數tuple or list 獲取合理引數list
  #把動態引數集合tuple轉為list 並把單獨的傳遞動態引數list從tuple中取出作為引數
  def turnArray(self,args):
    #args (1,3) 直接呼叫型 exe("select x x",1,3)
    #return [1,3] <- list(args)
    #args ([1,3],) list傳入型 exe("select x x",[ 1,3]) len(args)=1 && type(args[0])=list
    #return [1,3]
    if(args and len(args) == 1 and (type(args[0]) is list) ):
      res = args[0]
    else:
      res = list(args)
    return res
  #分頁查詢 查詢page頁 每頁num條 返回 分頁前總條數 和 當前頁的資料列表 count,listR = db.executeQueryPage("select x x",(args))
  def executeQueryPage(self,sql,page,num,*args):
    args = self.turnArray(args)
    count = self.getCount(sql,args)
    pageSql = "select * from ( " + sql + " ) limit 5 offset 0 "
    #args.append(num)
    #args.append(int(num) * (int(page) - 1) )
    self.out(pageSql,args) 
    conn = self.getConn()
    cursor = conn.cursor()
    listRes = cursor.execute(sql,args).fetchall()
    return (count,listRes)  
  #查詢列表array[map] eg: [{'id': u'id02','birth': u'birth01','name': u'name02'},{'id': u'id03','name': u'name03'}]
  def executeQuery(self,*args):
    args = self.turnArray(args)
    self.out(sql,args) 
    conn = self.getConn()
    cursor = conn.cursor()
    res = cursor.execute(sql,args).fetchall()
    return res  
  #執行sql或者查詢列表 並提交
  def execute(self,args) 
    conn = self.getConn()
    cursor = conn.cursor()
    #sql佔位符 填充args 可以是tuple(1,2)(動態引數陣列) 也可以是list[1,2] list(tuple) tuple(list)
    res = cursor.execute(sql,args).fetchall()
    conn.commit()
    #self.close(conn)
    return res  
  #查詢列名列表array[str] eg: ['id','name','birth']
  def getColumnNames(self,args) 
    conn = self.getConn()
    if(not conn is None):
      cursor = conn.cursor()
      cursor.execute(sql,args)
      res = [tuple[0] for tuple in cursor.description]
    return res  
  #查詢結果為單str eg: 'xxxx'
  def getString(self,args).fetchall()
    columnNames = [tuple[0] for tuple in cursor.description]
    #print(columnNames)
    res = ""
    if(listRes and len(listRes) >= 1):
      res = listRes[0][columnNames[0]]
    return res   
  #查詢記錄數量 自動附加count(*) eg: 3
  def getCount(self,*args):
    args = self.turnArray(args)
    sql = "select count(*) cc from ( " + sql + " ) "
    resString = self.getString(sql,args)  
    res = 0   
    if(resString):
      res = int(resString)
    return res
####################################測試
def main():
  db = database()
  db.execute(
    ''' 
    create table if not exists student(
      id   text primary key,name  text not null,birth  text 
    )
    ''' 
  )
  for i in range(10):
    db.execute("insert into student values('id1" + str(i) + "','name1" + str(i) + "','birth1" + str(i) + "')")
  db.execute("insert into student values('id01','name01','birth01')")
  db.execute("insert into student values('id02','name02','birth01')")
  db.execute("insert into student values('id03','name03','birth01')")
  print(db.getColumnNames("select * from student"))
  print(db.getCount("select * from student " ))
  print(db.getString("select name from student where id = ? ","id02" ))
  print(db.executeQuery("select * from student where 1=? and 2=? ",2 ))
  print(db.executeQueryPage("select * from student where id like ? ",5,"id0%"))
  db.execute("update student set name='nameupdate' where id = ? ","id02")
  db.execute("delete from student where id = ? or 1=1 ","id01")
  db.close()
if __name__ == '__main__':
  main()

更多關於Python相關內容感興趣的讀者可檢視本站專題:《Python操作SQLite資料庫技巧總結》、《Python常見資料庫操作技巧彙總》、《Python資料結構與演算法教程》、《Python函式使用技巧總結》、《Python字串操作技巧彙總》、《Python入門與進階經典教程》及《Python檔案與目錄操作技巧彙總》

希望本文所述對大家Python程式設計有所幫助。