Python--資料庫操作與pymysql
阿新 • • 發佈:2019-02-02
在進行Python開發以及測試框架、工具開發時,我們會經常從資料庫獲取相關資料、更新資料庫中資料,Python提供了很多的包供大家使用,在這裡我簡單給大家介紹一下pymysql。
1、連線資料庫
pymysql.connect提供了連線資料庫的方法,該方法會連線資料庫並返回一個物件,表示當前到資料庫的會話。
下面介紹相關引數:
host | 主機名稱,若本地資料庫則為127.0.0.1 |
port | 連線資料庫的埠,一般為3306 |
user | 連線資料庫的使用者名稱 |
password | 連線資料庫的密碼 |
db | 資料庫名稱 |
use_unicode | 將資料庫中的資料按GBK格式解碼成位元組碼 |
charset | 將解碼後的位元組碼按UTF-8格式編碼資料,最後再將資料返回給客戶端 |
import pymysql
conn=pymysql.connect(host='127.0.0.1',port='3306',user='root',passwd='198903017',
db='api_database',use_unicode=True, charset="utf8",)
下面介紹連線物件的方法:
close() | 關閉連線物件,之後連線物件及其遊標cursor將不可用 |
commit() | 提交未提交的事務,如果支援的話,否則什麼都不做 |
rollback() | 回滾未提交的事務(可能不可用) |
cursor() | 返回連線的遊標物件 |
2、遊標
下面介紹遊標物件的方法:
callproc(name[,params]) | 使用指定的引數呼叫指定的資料庫過程(可選) |
close() | 關閉遊標。關閉後遊標不可用 |
execute(open[,params]) | 執行一個sql操作---可能指定引數 |
executemany(open,pseq) | 執行指定的sql操作多次,每次都序列中的一組引數 |
fetchone() | 以序列的方式取回查詢結果中的下一行;如果沒有更好的行,就返回None |
fetchmany([size]) | 取回查詢結果中的多行,其中引數size的值預設為arraysize |
fetchall() | 以序列的方式取回餘下的所有行 |
nextset() | 跳到下一個結果集,這個方法是可選的 |
setinputsizes(sizes) | 用於為引數預定義記憶體區域 |
setoutputsize(size[,coll]) | 為取回大量資料而設定緩衝區長度 |
下面介紹遊標物件的屬性
description | 由結果列描述組成的序列(只讀) |
rowcount | 結果包含的行數(只讀) |
arraysize | fetchmany返回的行數,預設為1 |
3、pymysql自定義封裝函式(提供借鑑)
下面是我自己在做介面自動化框架時封裝的關於資料庫操作的類以及類裡面的一些函式,提供給大家借鑑:
import pymysql
from Common.CommonTools.PathTools import config_path
from Common.CommonTools.ConfigReadTools import ConfigReadTools
'''資料庫操作相關函式'''
class DataBaseTools:
def __init__(self):
'''初始化-連線資料庫'''
self.conn=self._connent_db() #連線資料庫
self.cur=self.conn.cursor() #建立遊標
def _connent_db(self):
'''連線資料庫'''
configread = ConfigReadTools(config_path)
database_conf = configread.get_section_item('DATABASE_CONF')
try:
conn=pymysql.connect(host=database_conf['Host'], #資料庫地址
port=int(database_conf['Port']), #埠號
user=database_conf['User'], #登入使用者名稱
passwd=database_conf['Password'], #登入密碼
db=database_conf['DB'], #連線的資料庫
use_unicode=True, charset="utf8",)
return conn
except Exception as e:
print('[Error] 資料庫連線失敗,具體錯誤:%s'%e)
def execute_sql(self,sql):
'''
執行sql語句
:param sql:sql語句
'''
try:
self.cur.execute(sql) #執行sql
except Exception as e:
self.conn.rollback() #回退資料庫
print('[Error] sql語句有誤,資料庫回退,具體錯誤:%s'%e)
else:
self.conn.commit() #sql正確時才進行提交
def get_select_alldata(self,selectsql):
'''
獲取查詢sql對應的所有資料
:param sql: select sql語句
:return: 查詢資料資訊
'''
try:
if 'Select' in selectsql or 'select' in selectsql or 'SELECT' in selectsql:
self.cur.execute(selectsql)
except Exception as e:
self.conn.rollback()
print('[Error] 查詢sql語句有誤.資料庫回退,具體錯誤:%s'%e)
else:
try:
result = self.cur.fetchall() # 獲取查詢結果
return result
except Exception as a:
print('[Error] 執行sql報錯,具體錯誤:%s'%a)
def get_selectdata_row(self,selectsql,index):
'''
獲取查詢sql反饋的某一行資料
:param selectsql: 查詢sql語句
:param index: 指定一行資料
:return: 反饋指定一行的資料
'''
result=self.get_select_alldata(selectsql)
return result[index]
def get_selectdata_column(self,selectsql,index):
'''
獲取查詢sql反饋的某一列資料
:param selectsql:查詢sql語句
:param index:指定一列資料
:return:反饋指定一列的資料
'''
column_data=[]
result=self.get_select_alldata(selectsql)
for i in range(len(result)):
column_data.append(result[i][index])
return column_data
def get_specific_data(self,selectsql,rowindex,columnindex):
'''
獲取指定的某一行的某一列的值,獲取特定某個資料
:param selectsql: 查詢sql語句
:param rowindex: 指定行數
:param columnindex: 指定列數
:return: 特定某個資料
'''
result=self.get_select_alldata(selectsql)
return result[rowindex][columnindex]
def check_value_isnull(self,selectsql,rowindex,columnindex):
'''
檢查指定單元格的value是否為空,空返回True,非空返回False
:param selectsql: 查詢sql語句
:param rowindex: 指定行數
:param columnindex: 指定列數
:return: True/False
'''
value=self.get_specific_data(selectsql,rowindex,columnindex)
if value==None:
return True
else:
return False
def cur_close(self):
'''關閉遊標'''
self.cur.close()
def close_db(self):
'''關閉資料庫'''
try:
self.conn.close()
except Exception as e:
print('[Error] 資料庫關閉出錯.具體錯誤:%s'%e)