python_reques介面測試框架,Excel作為案例資料來源
阿新 • • 發佈:2020-07-25
一、框架選單
1.1 common模組
1.2 其他
二、Excel介面測試案例編寫
三、讀取Excel測試封裝(核心封裝)
excel_utils.py 讀取Excel中的資料
import os import xlrd #內建模組、第三方模組pip install 自定義模組 class ExcelUtils(): def __init__(self,file_path,sheet_name): self.file_path = file_path self.sheet_name = sheet_name self.sheetView Code= self.get_sheet() # 整個表格物件 def get_sheet(self): wb = xlrd.open_workbook(self.file_path) sheet = wb.sheet_by_name(self.sheet_name) return sheet def get_row_count(self): row_count = self.sheet.nrows return row_count def get_col_count(self): col_count= self.sheet.ncols return col_count def __get_cell_value(self,row_index, col_index): cell_value = self.sheet.cell_value(row_index,col_index) return cell_value def get_merged_info(self): merged_info = self.sheet.merged_cells return merged_info defget_merged_cell_value(self,row_index, col_index): """既能獲取普通單元格的資料又能獲取合併單元格資料""" cell_value = None for (rlow, rhigh, clow, chigh) in self.get_merged_info(): if (row_index >= rlow and row_index < rhigh): if (col_index >= clow and col_index < chigh): cell_value = self.__get_cell_value(rlow, clow) break; # 防止迴圈去進行判斷出現值覆蓋的情況 else: cell_value = self.__get_cell_value(row_index, col_index) else: cell_value = self.__get_cell_value(row_index, col_index) return cell_value def get_sheet_data_by_dict(self): all_data_list = [] first_row = self.sheet.row(0) #獲取首行資料 for row in range(1, self.get_row_count()): row_dict = {} for col in range(0, self.get_col_count()): row_dict[first_row[col].value] = self.get_merged_cell_value(row, col) all_data_list.append(row_dict) return all_data_list if __name__=='__main__': current_path = os.path.dirname(__file__) excel_path = os.path.join( current_path,'..','samples/data/test_case.xlsx' ) excelUtils = ExcelUtils(excel_path,"Sheet1") for row in excelUtils.get_sheet_data_by_dict(): print( row )
testdata_utils.py 讀取Excel中的資料後處理成需要的資料
import os from common.excel_utils import ExcelUtils from common.config_utils import config current_path = os.path.dirname(__file__) test_data_path = os.path.join( current_path,'..', config.CASE_DATA_PATH ) class TestdataUtils(): def __init__(self,test_data_path = test_data_path): self.test_data_path = test_data_path self.test_data = ExcelUtils(test_data_path,"Sheet1").get_sheet_data_by_dict() self.test_data_by_mysql = SqlUtils().get_mysql_test_case_info() def __get_testcase_data_dict(self): testcase_dict = {} for row_data in self.test_data: testcase_dict.setdefault( row_data['測試用例編號'],[] ).append( row_data ) return testcase_dict def def_testcase_data_list(self): testcase_list = [] for k,v in self.__get_testcase_data_dict().items(): one_case_dict = {} one_case_dict["case_id"] = k one_case_dict["case_info"] = v testcase_list.append( one_case_dict ) return testcase_list if __name__=="__main__": testdataUtils = TestdataUtils() for i in testdataUtils.def_testcase_data_list(): print( i )View Code
四、request封裝(核心封裝)
requests_utils.py 包含post請求,get請求,異常,呼叫斷言
import ast import re import requests import jsonpath from requests.exceptions import RequestException from requests.exceptions import ProxyError from requests.exceptions import ConnectionError from common.config_utils import config from common.check_utils import CheckUtils class RequestsUtils(): def __init__(self): self.hosts = config.hosts self.headers = {"ContentType":"application/json;charset=utf-8"} self.session = requests.session() self.temp_variables = {} def __get(self,get_info): try: url = self.hosts + get_info["請求地址"] response = self.session.get( url = url, params = ast.literal_eval(get_info["請求引數(get)"]) ) response.encoding = response.apparent_encoding if get_info["取值方式"] == "json取值": value = jsonpath.jsonpath( response.json(),get_info["取值程式碼"] )[0] self.temp_variables[ get_info["傳值變數"] ] = value elif get_info["取值方式"] == "正則取值": value = re.findall(get_info["取值程式碼"],response.text)[0] self.temp_variables[get_info["傳值變數"]] = value result = CheckUtils(response).run_check(get_info['期望結果型別'], get_info['期望結果']) except ProxyError as e: result = {'code': 4, 'result': '[%s]請求:代理錯誤異常' % (get_info["介面名稱"])} except ConnectionError as e: result = {'code': 4, 'result': '[%s]請求:連線超時異常' % (get_info["介面名稱"])} except RequestException as e: result = {'code': 4, 'result': '[%s]請求:Request異常,原因:%s' % (get_info["介面名稱"], e.__str__())} except Exception as e: result = {'code':4,'result':'[%s]請求:系統異常,原因:%s'%(get_info["介面名稱"],e.__str__())} return result def __post(self,post_info): try: url = self.hosts + post_info["請求地址"] response = self.session.post( url = url, headers = self.headers, params = ast.literal_eval(post_info["請求引數(get)"]), # data = post_infos["提交資料(post)"], json=ast.literal_eval(post_info["提交資料(post)"]) ) response.encoding = response.apparent_encoding if post_info["取值方式"] == "json取值": value = jsonpath.jsonpath( response.json(),post_info["取值程式碼"] )[0] self.temp_variables[ post_info["傳值變數"] ] = value elif post_info["取值方式"] == "正則取值": value = re.findall(post_info["取值程式碼"],response.text)[0] self.temp_variables[post_info["傳值變數"]] = value #呼叫CheckUtils() result = CheckUtils(response).run_check(post_info['期望結果型別'],post_info['期望結果']) except ProxyError as e: result = {'code': 4, 'result': '[%s]請求:代理錯誤異常' % (post_info["介面名稱"])} except ConnectionError as e: result = {'code': 4, 'result': '[%s]請求:連線超時異常' % (post_info["介面名稱"])} except RequestException as e: result = {'code': 4, 'result': '[%s]請求:Request異常,原因:%s' % (post_info["介面名稱"], e.__str__())} except Exception as e: result = {'code':4,'result':'[%s]請求:系統異常,原因:%s'%(post_info["介面名稱"],e.__str__())} return result def request(self,step_info): try: request_type = step_info["請求方式"] param_variable_list = re.findall('\\${\w+}', step_info["請求引數(get)"]) if param_variable_list: for param_variable in param_variable_list: step_info["請求引數(get)"] = step_info["請求引數(get)"]\ .replace(param_variable,'"%s"' % self.temp_variables.get(param_variable[2:-1])) if request_type == "get": result = self.__get( step_info ) elif request_type == "post": data_variable_list = re.findall('\\${\w+}', step_info["提交資料(post)"]) if data_variable_list: for param_variable in data_variable_list: step_info["提交資料(post)"] = step_info["提交資料(post)"] \ .replace(param_variable, '"%s"' % self.temp_variables.get(param_variable[2:-1])) result = self.__post( step_info ) else: result = {'code':1,'result':'請求方式不支援'} except Exception as e: result = {'code':4,'result':'用例編號[%s]的[%s]步驟出現系統異常,原因:%s'%(step_info['測試用例編號'],step_info["測試用例步驟"],e.__str__())} return result def request_by_step(self,step_infos): self.temp_variables = {} for step_info in step_infos: temp_result = self.request( step_info ) # print( temp_result ) if temp_result['code']!=0: break return temp_result if __name__=="__main__": case_info = [ {'請求方式': 'get', '請求地址': '/cgi-bin/token', '請求引數(get)': '{"grant_type":"client_credential","appid":"wxXXXXXxc16","secret":"XXXXXXXX"}', '提交資料(post)': '', '取值方式': 'json取值', '傳值變數': 'token', '取值程式碼': '$.access_token', '期望結果型別': '正則匹配', '期望結果': '{"access_token":"(.+?)","expires_in":(.+?)}'}, {'請求方式': 'post', '請求地址': '/cgi-bin/tags/create', '請求引數(get)': '{"access_token":${token}}', '提交資料(post)': '{"tag" : {"name" : "衡東"}}','取值方式': '無', '傳值變數': '', '取值程式碼': '', '期望結果型別': '正則匹配', '期望結果': '{"tag":{"id":(.+?),"name":"衡東"}}'} ] RequestsUtils().request_by_step(case_info)View Code
五、斷言封裝(核心封裝)
check_utils.py 斷言封裝,與實際結果核對
import re import ast class CheckUtils(): def __init__(self,check_response=None): self.ck_response=check_response self.ck_rules = { '無': self.no_check, 'json鍵是否存在': self.check_key, 'json鍵值對': self.check_keyvalue, '正則匹配': self.check_regexp } self.pass_result = { 'code': 0, 'response_reason': self.ck_response.reason, 'response_code': self.ck_response.status_code, 'response_headers': self.ck_response.headers, 'response_body': self.ck_response.text, 'check_result': True, 'message': '' # 擴招作為日誌輸出等 } self.fail_result = { 'code': 2, 'response_reason': self.ck_response.reason, 'response_code': self.ck_response.status_code, 'response_headers': self.ck_response.headers, 'response_body': self.ck_response.text, 'check_result': False, 'message': '' # 擴招作為日誌輸出等 } def no_check(self): return self.pass_result def check_key(self,check_data=None): check_data_list = check_data.split(',') #把需要判斷的值做切割,取出鍵值 res_list = [] #存放每次比較的結果 wrong_key = [] #存放比較失敗key for check_data in check_data_list: #把切割的鍵值和取出響應結果中的所有的鍵一個一個對比 if check_data in self.ck_response.json().keys(): res_list.append(self.pass_result ) else: res_list.append( self.fail_result ) wrong_key.append(check_data) #把失敗的鍵放進來,便於後續日誌輸出 # print(res_list) # print(wrong_key) if self.fail_result in res_list: return self.fail_result else: return self.pass_result def check_keyvalue(self,check_data=None): res_list = [] # 存放每次比較的結果 wrong_items = [] # 存放比較失敗 items for check_item in ast.literal_eval(check_data).items(): #literal_eval()安全性的把字串轉成字典,items()取出鍵值對 if check_item in self.ck_response.json().items(): res_list.append( self.pass_result ) else: res_list.append( self.fail_result ) wrong_items.append(check_item) # print( res_list ) # print( wrong_items ) if self.fail_result in res_list: return self.fail_result else: return self.pass_result def check_regexp(self,check_data=None): pattern = re.compile(check_data) if re.findall(pattern=pattern,string=self.ck_response.text): #匹配到了,不為空,為true return self.pass_result else: return self.fail_result def run_check(self,check_type=None,check_data=None): code = self.ck_response.status_code if code == 200: if check_type in self.ck_rules.keys(): result=self.ck_rules[check_type](check_data) return result else: self.fail_result['message'] = '不支援%s判斷方法'%check_type return self.fail_result else: self.fail_result['message'] = '請求的響應狀態碼非%s'%str(code) return self.fail_result if __name__=="__main__": # 檢查鍵是否存在,{"access_token":"hello","expires_":7200} 設為響應結果,"access_token,expires_in" 為檢查物件值 CheckUtils({"access_token":"hello","expires_":7200}).check_key("access_token,expires_in") #檢查鍵值對是否存在 CheckUtils({"access_token":"hello","expires_i":7200}).check_keyvalue('{"expires_in": 7200}') #正則對比 #TURE print(CheckUtils('{"access_token":"hello","expires_in":7200}').check_regexp('"expires_in":(.+?)')) #False print(CheckUtils('{"access_token":"hello","expires":7200}').check_regexp('"expires_in":(.+?)'))View Code
六、api_testcase下的api_test.py 封裝
import warnings import unittest import paramunittest from common.testdata_utils import TestdataUtils from common.requests_utils import RequestsUtils #如果是mysql資料來源的話切換成 def_testcase_data_list_by_mysql() exccel資料來源:def_testcase_data_list() case_infos = TestdataUtils().def_testcase_data_list_by_mysql() @paramunittest.parametrized( *case_infos ) class APITest(paramunittest.ParametrizedTestCase): def setUp(self) -> None: warnings.simplefilter('ignore', ResourceWarning) #不會彈出警告提示 def setParameters(self, case_id, case_info): self.case_id = case_id self.case_info = case_info def test_api_common_function(self): '''測試描述''' self._testMethodName = self.case_info[0].get("測試用例編號") self._testMethodDoc = self.case_info[0].get("測試用例名稱") actual_result = RequestsUtils().request_by_step(self.case_info) self.assertTrue( actual_result.get('check_result'),actual_result.get('message') ) if __name__ == '__main__': unittest.main()View Code
七、common下的log_utils.py 封裝
import os import logging import time from common.config_utils import config current_path = os.path.dirname(__file__) log_output_path = os.path.join( current_path,'..', config.LOG_PATH ) class LogUtils(): def __init__(self,log_path=log_output_path): self.log_name = os.path.join( log_output_path ,'ApiTest_%s.log'%time.strftime('%Y_%m_%d') ) self.logger = logging.getLogger("ApiTestLog") self.logger.setLevel( config.LOG_LEVEL ) console_handler = logging.StreamHandler() # 控制檯輸出 file_handler = logging.FileHandler(self.log_name,'a',encoding='utf-8') # 檔案輸出 formatter = logging.Formatter("%(asctime)s %(name)s %(levelname)s %(message)s") console_handler.setFormatter(formatter) file_handler.setFormatter(formatter) self.logger.addHandler( console_handler ) self.logger.addHandler( file_handler ) console_handler.close() # 防止列印日誌重複 file_handler.close() # 防止列印日誌重複 def get_logger(self): return self.logger logger = LogUtils().get_logger() # 防止列印日誌重複 if __name__ == '__main__': logger.info('hello')View Code
八、common下的config_utils.py的封裝
配置檔案的編寫:
對配置檔案的讀取封裝:
import os import configparser current_path = os.path.dirname(__file__) cfgpath = os.path.join(current_path, "../conf/local_config.ini") print(cfgpath) class ConfigUtils: def __init__(self,config_path=cfgpath): self.__conf=configparser.ConfigParser() self.__conf.read(config_path, encoding="utf-8") def read_ini(self,sec,option): value=self.__conf.get(sec,option) return value @property def hosts(self): value=self.read_ini('default','hosts') return value @property def LOG_PATH(self): value = self.read_ini('path', 'LOG_PATH') return value @property def CASE_DATA_PATH(self): value = self.read_ini('path', 'CASE_DATA_PATH') return value @property def REPORT_PATH(self): value = self.read_ini('path', 'REPORT_PATH') return value @property def LOG_LEVEL(self): value = int(self.read_ini('log', 'LOG_LEVEL')) return value @property def smtp_server(self): smtp_server_value = self.read_ini('email', 'smtp_server') return smtp_server_value @property def smtp_sender(self): smtp_sender_value = self.read_ini('email', 'smtp_sender') return smtp_sender_value @property def smtp_password(self): smtp_password_value = self.read_ini('email', 'smtp_password') return smtp_password_value @property def smtp_receiver(self): smtp_receiver_value = self.read_ini('email', 'smtp_receiver') return smtp_receiver_value @property def smtp_cc(self): smtp_cc_value = self.read_ini('email', 'smtp_cc') return smtp_cc_value @property def smtp_subject(self): smtp_subject_value = self.read_ini('email', 'smtp_subject') return smtp_subject_value config=ConfigUtils() if __name__=='__main__': current_path = os.path.dirname(__file__) cfgpath = os.path.join(current_path, "../conf/local_config.ini") config_u=ConfigUtils() print(config_u.hosts) print(config_u.LOG_LEVEL)View Code
九、test_runner下的run_case.py 封裝
class RunCase(): def __init__(self): self.test_case_path = test_case_path self.report_path = test_report_path self.title = 'P1P2介面自動化測試報告' self.description = '自動化介面測試框架' self.tester = '測試開發組' def load_test_suite(self): discover = unittest.defaultTestLoader.discover(start_dir=self.test_case_path, pattern='api_test.py', top_level_dir=self.test_case_path) all_suite = unittest.TestSuite() all_suite.addTest( discover ) return all_suite def run(self): report_dir = HTMLTestReportCN.ReportDirectory(self.report_path) report_dir.create_dir(self.title) report_file_path = HTMLTestReportCN.GlobalMsg.get_value('report_path') fp = open( report_file_path ,'wb' ) runner = HTMLTestReportCN.HTMLTestRunner(stream=fp, title=self.title, description=self.description, tester=self.tester) runner.run( self.load_test_suite() ) fp.close() return report_file_path if __name__=='__main__': report_path = RunCase().run() EmailUtils(open(report_path, 'rb').read(), report_path).send_mail()View Code
十、common下的email_utils.py 封裝
import os import smtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from common.config_utils import config class EmailUtils(): def __init__(self,smtp_body,smtp_attch_path=None): self.smtp_server = config.smtp_server self.smtp_sender = config.smtp_sender self.smtp_password = config.smtp_password self.smtp_receiver = config.smtp_receiver self.smtp_cc = config.smtp_cc self.smtp_subject = config.smtp_subject self.smtp_body = smtp_body self.smtp_attch = smtp_attch_path def mail_message_body(self): message = MIMEMultipart() message['from'] = self.smtp_sender message['to'] = self.smtp_receiver message['Cc'] = self.smtp_cc message['subject'] = self.smtp_subject message.attach( MIMEText(self.smtp_body,'html','utf-8') ) if self.smtp_attch: attach_file = MIMEText(open(self.smtp_attch, 'rb').read(), 'base64', 'utf-8') attach_file['Content-Type'] = 'application/octet-stream' attach_file.add_header('Content-Disposition', 'attachment', filename=('gbk', '', os.path.basename(self.smtp_attch))) message.attach(attach_file) return message def send_mail(self): smtp = smtplib.SMTP() smtp.connect(self.smtp_server) smtp.login(user=self.smtp_sender, password=self.smtp_password) smtp.sendmail(self.smtp_sender,self.smtp_receiver.split(",")+ self.smtp_cc.split(","), self.mail_message_body().as_string()) if __name__=='__main__': html_path = os.path.dirname(__file__) + '/../test_reports/介面自動化測試報告V1.1/介面自動化測試報告V1.1.html' EmailUtils('<h3 align="center">自動化測試報告</h3>',html_path).send_mail()View Code