python+requests介面自動化框架的實現
為什麼要做介面自動化框架
1、業務與配置的分離
2、資料與程式的分離;資料的變更不影響程式
3、有日誌功能,實現無人值守
4、自動傳送測試報告
5、不懂程式設計的測試人員也可以進行測試
正常介面測試的流程是什麼?
確定介面測試使用的工具----->配置需要的介面引數----->進行測試----->檢查測試結果----->生成測試報告
測試的工具:python+requests
介面測試用例:excel
一、介面框架如下:
1、action包:用來存放關鍵字函式
2、config包:用來存放配置檔案
3、TestData:用來存放測試資料,excel表
4、Log包:用來存放日誌檔案
5、utils包:用來存放公共的類
6、執行主程式interface_auto_test.py
7、Readme.txt:告訴團隊組員使用改框架需要注意的地方
二、介面的資料規範設計---Case設計
一個sheet對應資料庫裡面一張表
APIsheet存放
編號;從1開始
介面的名稱(APIName);
請求的url(RequestUrl);
請求的方法(RequestMethod);
傳參的方式(paramsType):post/get請求方法不一樣
用例說明(APITestCase)
是否執行(Active)部分介面已測通,下次不用測試,直接把這裡設定成N,跳過此介面
post與get的區別
檢視post詳情
post請求引數一般是json串,引數放在from表單裡面;引數一般不可見,相對來說安全性高些
檢視get詳情
get請求引數一般直接放在url裡面
2.1註冊介面用例
RequestData:請求的資料
(開發制定的傳參方式)
RelyData:資料依賴
ResponseCode:響應code
ResponseData:響應資料
DataStore:儲存的依賴資料;如果存在資料庫裡面,在表裡增加一個欄位用來存依賴的資料
(儲存的方式是編寫介面自動化的人員來設定的儲存方式)
CheckPoint:檢查點
Active:是否執行
Status:執行用例的狀態,方便檢視用例是否執行成功
2.2登入介面用例
RequestData:請求的資料
(開發制定的傳參方式)
RelyData:資料依賴
(儲存的方式是編寫介面自動化的人員來設定的儲存方式)
ResponseCode:響應code
ResponseData:響應資料
DataStore:儲存的依賴資料;如果存在資料庫裡面,在表裡增加一個欄位用來存依賴的資料
(儲存的方式是編寫介面自動化的人員來設定的儲存方式)
CheckPoint:檢查點
Active:是否執行
Status:執行用例的狀態,方便檢視用例是否執行成功
ErrorInfo:case執行失敗,失敗的錯誤資訊;eg:是也本身的原因還是case設定失敗,還是其他原因
重點說明下RelyData:資料依賴
採取的是字典:key:value來儲存資料格式;
{"request":{"username":"register->1","password":"register->1"},"response":{"code":"register->1"}}
格式化之後:
{ "request":{ "username":"register->1","password":"register->1" },"response":{ "code":"register->1" } }
三、建立utils包:用來存放公共的類
3.1 ParseExcel.py 操作封裝excel的類(ParseExcel.py)
#encoding=utf-8 import openpyxl from openpyxl.styles import Border,Side,Font import time class ParseExcel(object): def __init__(self): self.workbook = None self.excelFile = None self.font = Font(color = None) # 設定字型的顏色 # 顏色對應的RGB值 self.RGBDict = {'red': 'FFFF3030','green': 'FF008B00'} def loadWorkBook(self,excelPathAndName): # 將excel檔案載入到記憶體,並獲取其workbook物件 try: self.workbook = openpyxl.load_workbook(excelPathAndName) except Exception as err: raise err self.excelFile = excelPathAndName return self.workbook def getSheetByName(self,sheetName): # 根據sheet名獲取該sheet物件 try: # sheet = self.workbook.get_sheet_by_name(sheetName) sheet = self.workbook[sheetName] return sheet except Exception as err: raise err def getSheetByIndex(self,sheetIndex): # 根據sheet的索引號獲取該sheet物件 try: # sheetname = self.workbook.get_sheet_names()[sheetIndex] sheetname = self.workbook.sheetnames[sheetIndex] except Exception as err: raise err # sheet = self.workbook.get_sheet_by_name(sheetname) sheet = self.workbook[sheetname] return sheet def getRowsNumber(self,sheet): # 獲取sheet中有資料區域的結束行號 return sheet.max_row def getColsNumber(self,sheet): # 獲取sheet中有資料區域的結束列號 return sheet.max_column def getStartRowNumber(self,sheet): # 獲取sheet中有資料區域的開始的行號 return sheet.min_row def getStartColNumber(self,sheet): # 獲取sheet中有資料區域的開始的列號 return sheet.min_column def getRow(self,sheet,rowNo): # 獲取sheet中某一行,返回的是這一行所有的資料內容組成的tuple, # 下標從1開始,sheet.rows[1]表示第一行 try: rows = [] for row in sheet.iter_rows(): rows.append(row) return rows[rowNo - 1] except Exception as err: raise err def getColumn(self,colNo): # 獲取sheet中某一列,返回的是這一列所有的資料內容組成tuple, # 下標從1開始,sheet.columns[1]表示第一列 try: cols = [] for col in sheet.iter_cols(): cols.append(col) return cols[colNo - 1] except Exception as err: raise err def getCellOfValue(self,coordinate = None,rowNo = None,colsNo = None): # 根據單元格所在的位置索引獲取該單元格中的值,下標從1開始,# sheet.cell(row = 1,column = 1).value, # 表示excel中第一行第一列的值 if coordinate != None: try: return sheet[coordinate] except Exception as err: raise err elif coordinate is None and rowNo is not None and \ colsNo is not None: try: return sheet.cell(row = rowNo,column = colsNo).value except Exception as err: raise err else: raise Exception("Insufficient Coordinates of cell !") def getCellOfObject(self,colsNo = None): # 獲取某個單元格的物件,可以根據單元格所在位置的數字索引, # 也可以直接根據excel中單元格的編碼及座標 # 如getCellObject(sheet,coordinate = 'A1') or # getCellObject(sheet,rowNo = 1,colsNo = 2) if coordinate != None: try: # return sheet.cell(coordinate = coordinate) return sheet[coordinate] except Exception as err: raise err elif coordinate == None and rowNo is not None and \ colsNo is not None: try: return sheet.cell(row = rowNo,column = colsNo) except Exception as err: raise err else: raise Exception("Insufficient Coordinates of cell !") def writeCell(self,content,colsNo = None,style = None): #根據單元格在excel中的編碼座標或者數字索引座標向單元格中寫入資料, # 下標從1開始,參style表示字型的顏色的名字,比如red,green if coordinate is not None: try: # sheet.cell(coordinate = coordinate).value = content sheet[coordinate] = content if style is not None: sheet[coordinate].\ font = Font(color = self.RGBDict[style]) self.workbook.save(self.excelFile) except Exception as e: raise e elif coordinate == None and rowNo is not None and \ colsNo is not None: try: sheet.cell(row = rowNo,column = colsNo).value = content if style: sheet.cell(row = rowNo,column = colsNo).\ font = Font(color = self.RGBDict[style]) self.workbook.save(self.excelFile) except Exception as e: raise e else: raise Exception("Insufficient Coordinates of cell !") def writeCellCurrentTime(self,colsNo = None): # 寫入當前的時間,下標從1開始 now = int(time.time()) #顯示為時間戳 timeArray = time.localtime(now) currentTime = time.strftime("%Y-%m-%d %H:%M:%S",timeArray) if coordinate is not None: try: sheet.cell(coordinate = coordinate).value = currentTime self.workbook.save(self.excelFile) except Exception as e: raise e elif coordinate == None and rowNo is not None \ and colsNo is not None: try: sheet.cell(row = rowNo,column = colsNo ).value = currentTime self.workbook.save(self.excelFile) except Exception as e: raise e else: raise Exception("Insufficient Coordinates of cell !") if __name__ == '__main__': # 測試程式碼 pe = ParseExcel() pe.loadWorkBook(r'D:\ProgramSourceCode\Python Source Code\WorkSpace\InterfaceFrame2018\inter_test_data.xlsx') sheetObj = pe.getSheetByName(u"API") print("通過名稱獲取sheet物件的名字:",sheetObj.title) # print help(sheetObj.rows) print("通過index序號獲取sheet物件的名字:",pe.getSheetByIndex(0).title) sheet = pe.getSheetByIndex(0) print(type(sheet)) print(pe.getRowsNumber(sheet)) #獲取最大行號 print(pe.getColsNumber(sheet)) #獲取最大列號 rows = pe.getRow(sheet,1) #獲取第一行 for i in rows: print(i.value) # # 獲取第一行第一列單元格內容 # print pe.getCellOfValue(sheet,colsNo = 1) # pe.writeCell(sheet,u'我愛祖國',rowNo = 10,colsNo = 10) # pe.writeCellCurrentTime(sheet,colsNo = 11)
3.2 封裝get/post請求(HttpClient.py)
import requests import json class HttpClient(object): def __init__(self): pass def request(self,requestMethod,requestUrl,paramsType,requestData,headers =None,**kwargs): if requestMethod == "post": print("---",type(requestData)) if paramsType == "form": response = self.__post(url = requestUrl,data = json.dumps(eval(requestData)),headers = headers,**kwargs) return response elif paramsType == "json": response = self.__post(url = requestUrl,json = json.dumps(eval(requestData)),**kwargs) return response elif requestMethod == "get": request_url = requestUrl if paramsType == "url": request_url = "%s%s" %(requestUrl,requestData) response = self.__get(url = request_url,params = requestData,**kwargs) return response def __post(self,url,data = None,json = None,headers=None,**kwargs): print("----") response = requests.post(url=url,data = data,json=json,headers=headers) return response def __get(self,params = None,**kwargs): response = requests.get(url,params = params,**kwargs) return response if __name__ == "__main__": hc = HttpClient() res = hc.request("get","http://39.106.41.11:8080/getBlogContent/","url",'2') print(res.json())
3.3 封裝MD5(md5_encrypt)
import hashlib def md5_encrypt(text): m5 = hashlib.md5() m5.update(text.encode("utf-8")) value = m5.hexdigest() return value if __name__ == "__main__": print(md5_encrypt("sfwe"))
3.4 封裝Log
import logging import logging.config from config.public_data import baseDir # 讀取日誌配置檔案 logging.config.fileConfig(baseDir + "\config\Logger.conf") # 選擇一個日誌格式 logger = logging.getLogger("example02")#或者example01 def debug(message): # 定義dubug級別日誌列印方法 logger.debug(message) def info(message): # 定義info級別日誌列印方法 logger.info(message) def warning(message): # 定義warning級別日誌列印方法 logger.warning(message)
3.5 封裝傳送Email類
import smtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from email.header import Header from ProjVar.var import * import os import smtplib from email import encoders from email.mime.base import MIMEBase from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from email.header import Header from email.utils import formataddr def send_mail(): mail_host="smtp.126.com" #設定伺服器 mail_user="testman1980" #使用者名稱 mail_pass="wulaoshi1980" #口令 sender = '[email protected]' receivers = ['[email protected]',"[email protected]"] # 接收郵件,可設定為你的QQ郵箱或者其他郵箱 # 建立一個帶附件的例項 message = MIMEMultipart() message['From'] = formataddr(["光榮之路吳老師","[email protected]"]) message['To'] = ','.join(receivers) subject = '自動化測試執行報告' message['Subject'] = Header(subject,'utf-8') message["Accept-Language"]="zh-CN" message["Accept-Charset"]="ISO-8859-1,utf-8,gbk" # 郵件正文內容 message.attach(MIMEText('最新執行的自動化測試報告,請參閱附件內容!','plain','utf-8')) # 構造附件1,傳送測試結果的excel檔案 att = MIMEBase('application','octet-stream') att.set_payload(open(ProjDirPath+"\\testdata\\testdata.xlsx",'rb').read()) att.add_header('Content-Disposition','attachment',filename=('gbk','',"自動化測試報告.xlsx")) encoders.encode_base64(att) message.attach(att) """ # 構造附件2,傳送當前目錄下的 runoob.txt 檔案 att2 = MIMEText(open('e:\\a.py','rb').read(),'base64','utf-8') att2["Content-Type"] = 'application/octet-stream' att2["Content-Disposition"] = 'attachment; filename="a.py"' message.attach(att2) """ try: smtpObj = smtplib.SMTP(mail_host) smtpObj.login(mail_user,mail_pass) smtpObj.sendmail(sender,receivers,message.as_string()) print("郵件傳送成功") except smtplib.SMTPException as e: print("Error: 無法傳送郵件",e) if __name__ == "__main__": send_mail()
四、 建立config包 用來存放公共的引數、配置檔案、長時間不變的變數值
建立public_data.py
import os # 整個專案的根目錄絕對路勁 baseDir = os.path.dirname(os.path.dirname(__file__)) # 獲取測試資料檔案的絕對路徑 file_path = baseDir + "/TestData/inter_test_data.xlsx" API_apiName = 2 API_requestUrl = 3 API_requestMothod = 4 API_paramsType = 5 API_apiTestCaseFileName = 6 API_active = 7 CASE_requestData = 1 CASE_relyData = 2 CASE_responseCode = 3 CASE_responseData = 4 CASE_dataStore = 5 CASE_checkPoint = 6 CASE_active = 7 CASE_status = 8 CASE_errorInfo = 9 # 儲存請求引數裡面依賴的資料 REQUEST_DATA = {} # 儲存響應物件中的依賴資料 RESPONSE_DATA = {} if __name__=="__main__": print(file_path) print(baseDir)
五、建立TestData目錄,用來存放測試檔案
inter_test_data.xlsx
六、建立action包,用來存放關鍵字函式
6.1 解決資料依賴 (GetRely.py)
from config.public_data import REQUEST_DATA,RESPONSE_DATA from utils.md5_encrypt import md5_encrypt REQUEST_DATA = {"使用者註冊":{"1":{"username":"zhangsan","password":"dfsdf23"},"headers":{"cookie":"asdfwerw"}}} RESPONSE_DATA = {"使用者註冊":{"1":{"code":"00"},"headers":{"age":2342}}} class GetRely(object): def __init__(self): pass @classmethod def get(self,dataSource,relyData,headSource = {}): print(type(dataSource)) print(dataSource) data = dataSource.copy() for key,value in relyData.items(): if key == "request": #說明應該去REQUEST_DATA中獲取 for k,v in value.items(): interfaceName,case_idx = v.split("->") val = REQUEST_DATA[interfaceName][case_idx][k] if k == "password": data[k] = md5_encrypt(val) else: data[k] = val elif key == "response": # 應該去RESPONSE_DATA中獲取 for k,case_idx = v.split("->") data[k] = RESPONSE_DATA[interfaceName][case_idx][k] elif key == "headers": if headSource: for key,value in value.items(): if key == "request": for k,v in value.items(): for i in v: headSource[i] = REQUEST_DATA[k]["headers"][i] elif key == "response": for i,val in value.items(): for j in val: headSource[j] = RESPONSE_DATA[i]["headers"][j] return "%s" %data if __name__ == "__main__": s = {"username": "","password": "","code":""} h = {"cookie":"123","age":332} rely = {"request": {"username": "使用者註冊->1","password": "使用者註冊->1"},"response":{"code":"使用者註冊->1"},"headers":{"request":{"使用者註冊":["cookie"]},"response":{"使用者註冊":["age"]}} } print(GetRely.get(s,rely,h))
6.2 解決資料儲存(RelyDataStore.py)
from config.public_data import RESPONSE_DATA,REQUEST_DATA class RelyDataStore(object): def __init__(self): pass @classmethod def do(cls,storePoint,apiName,caseId,request_source = {},response_source = {},req_headers={},res_headers = {}): for key,value in storePoint.items(): if key == "request": # 說明需要儲存的依賴資料來自請求引數,應該將資料儲存到REQUEST_DATA for i in value: if i in request_source: val = request_source[i] if apiName not in REQUEST_DATA: # 說明儲存資料的結構還未生成,需要指明資料儲存結構 REQUEST_DATA[apiName]={str(caseId): {i: val}} else: #說明儲存資料結構中最外層結構已存在 if str(caseId) in REQUEST_DATA[apiName]: REQUEST_DATA[apiName][str(caseId)][i] = val else: # 說明內層結構不完整,需要指明完整的結構 REQUEST_DATA[apiName][str(caseId)] = {i: val} else: print("請求引數中不存在欄位" + i) elif key == "response": #說明需要儲存的依賴資料來自介面的響應body,應該將資料儲存到RESPONSE_DATA for j in value: if j in response_source: val = response_source[j] if apiName not in RESPONSE_DATA: # 說明儲存資料的結構還未生成,需要指明資料儲存結構 RESPONSE_DATA[apiName]={str(caseId): {j: val}} else: #說明儲存資料結構中最外層結構已存在 if str(caseId) in RESPONSE_DATA[apiName]: RESPONSE_DATA[apiName][str(caseId)][j] = val else: # 說明內層結構不完整,需要指明完整的結構 RESPONSE_DATA[apiName][str(caseId)] = {j: val} else: print("介面的響應body中不存在欄位" + j) elif key == "headers": for k,v in value.items(): if k == "request": # 說明需要往REQUEST_DATA變數中寫入儲存資料 for item in v: if item in req_headers: header = req_headers[item] if "headers" in REQUEST_DATA[apiName]: REQUEST_DATA[apiName]["headers"][item] = header else: REQUEST_DATA[apiName]["headers"] = {item: header} elif k == "response": # 說明需要往RESPONSE_DATA變數中寫入儲存資料 for it in v: if it in res_headers: header = res_headers[it] if "headers" in RESPONSE_DATA[apiName]: RESPONSE_DATA[apiName]["headers"][it] = header else: RESPONSE_DATA[apiName]["headers"] = {item: header} print(REQUEST_DATA) print(RESPONSE_DATA) if __name__ == "__main__": r = {"username": "srwcx01","password": "wcx123wac1","email": "[email protected]"} req_h = {"cookie":"csdfw23"} res_h = {"age":597232} s = {"request": ["username","password"],"response": ["userid"],"headers":{"request":["cookie"],"response":["age"]}} res = {"userid": 12,"code": "00"} RelyDataStore.do(s,"register",1,r,res,req_headers=req_h,res_headers=res_h) print(REQUEST_DATA) print(RESPONSE_DATA)
6.3 校驗資料結果(CheckResult.py)
import re class CheckResult(object): def __init__(self): pass @classmethod def check(self,responseObj,checkPoint): responseBody = responseObj.json() # responseBody = {"code": "","userid": 12,"id": "12"} errorKey = {} for key,value in checkPoint.items(): if key in responseBody: if isinstance(value,(str,int)): # 等值校驗 if responseBody[key] != value: errorKey[key] = responseBody[key] elif isinstance(value,dict): sourceData = responseBody[key] if "value" in value: # 模糊匹配校驗 regStr = value["value"] rg = re.match(regStr,"%s" %sourceData) if not rg: errorKey[key] = sourceData elif "type" in value: # 資料型別校驗 typeS = value["type"] if typeS == "N": # 說明是整形校驗 if not isinstance(sourceData,int): errorKey[key] = sourceData else: errorKey[key] = "[%s] not exist" %key return errorKey if __name__ == "__main__": r = {"code": "00","id": 12} c = {"code": "00","userid": {"type": "N"},"id": {"value": "\d+"}} print(CheckResult.check(r,c))
6.4 往excel裡面寫結果
from config.public_data import * def write_result(wbObj,sheetObj,responseData,errorKey,rowNum): try: # 寫響應body wbObj.writeCell(sheetObj,content="%s" %responseData,rowNo = rowNum,colsNo=CASE_responseData) # 寫校驗結果狀態及錯誤資訊 if errorKey: wbObj.writeCell(sheetObj,content="%s" %errorKey,rowNo=rowNum,colsNo=CASE_errorInfo) wbObj.writeCell(sheetObj,content="faild",colsNo=CASE_status,style="red") else: wbObj.writeCell(sheetObj,content="pass",style="green") except Exception as err: raise err
七、建立Log目錄用來存放日誌
八、主函式
#encoding=utf-8 import requests import json from action.get_rely import GetRely from config.public_data import * from utils.ParseExcel import ParseExcel from utils.HttpClient import HttpClient from action.data_store import RelyDataStore from action.check_result import CheckResult from action.write_result import write_result from utils.Log import * def main(): parseE = ParseExcel() parseE.loadWorkBook(file_path) sheetObj = parseE.getSheetByName("API") activeList = parseE.getColumn(sheetObj,API_active) for idx,cell in enumerate(activeList[1:],2): if cell.value == "y": #需要被執行 RowObj = parseE.getRow(sheetObj,idx) apiName = RowObj[API_apiName -1].value requestUrl = RowObj[API_requestUrl - 1].value requestMethod = RowObj[API_requestMothod - 1].value paramsType = RowObj[API_paramsType - 1].value apiTestCaseFileName = RowObj[API_apiTestCaseFileName - 1].value # 下一步讀取用例sheet表,準備執行測試用例 caseSheetObj = parseE.getSheetByName(apiTestCaseFileName) caseActiveObj = parseE.getColumn(caseSheetObj,CASE_active) for c_idx,col in enumerate(caseActiveObj[1:],2): if col.value == "y": #需要執行的用例 caseRowObj = parseE.getRow(caseSheetObj,c_idx) requestData = caseRowObj[CASE_requestData - 1].value relyData = caseRowObj[CASE_relyData - 1].value responseCode = caseRowObj[CASE_responseCode - 1].value responseData = caseRowObj[CASE_responseData - 1].value dataStore = caseRowObj[CASE_dataStore -1].value checkPoint = caseRowObj[CASE_checkPoint - 1].value #傳送介面請求之前需要做一下資料依賴的處理 if relyData: logging.info("處理第%s個介面的第%s條用例的資料依賴!") requestData = GetRely.get(eval(requestData),eval(relyData)) httpC = HttpClient() response = httpC.request(requestMethod=requestMethod,requestData=requestData,requestUrl=requestUrl,paramsType=paramsType ) # 獲取到響應結果後,接下來進行資料依賴儲存邏輯實現 if response.status_code == 200: responseData = response.json() # 進行依賴資料儲存 if dataStore: RelyDataStore.do(eval(dataStore),c_idx - 1,eval(requestData),responseData) # 接下來就是校驗結果 else: logging.info("介面【%s】的第【%s】條用例,不需要進行依賴資料儲存!" %(apiName,c_idx)) if checkPoint: errorKey = CheckResult.check(response,eval(checkPoint)) write_result(parseE,caseSheetObj,c_idx) else: logging.info("介面【%s】的第【%s】條用例,執行失敗,介面協議code非200!" %(apiName,c_idx)) else: logging.info("第%s個介面的第%s條用例,被忽略執行!" %(idx -1,c_idx-1)) else: logging.info("第%s行的介面被忽略執行!" %(idx -1)) if __name__=="__main__": main()
框架待完善~~請各路神仙多多指教~~
到此這篇關於python+requests介面自動化框架的實現的文章就介紹到這了,更多相關python requests介面自動化內容請搜尋我們以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援我們!