1. 程式人生 > 實用技巧 >python_reques介面測試框架,Excel作為案例資料來源

python_reques介面測試框架,Excel作為案例資料來源

一、框架選單

1.1 common模組

1.2 其他

二、Excel介面測試案例編寫

三、讀取Excel測試封裝(核心封裝)

excel_utils.py 讀取Excel中的資料

import os
import xlrd   #內建模組、第三方模組pip install  自定義模組


class ExcelUtils():
    def __init__(self,file_path,sheet_name):
        self.file_path = file_path
        self.sheet_name = sheet_name
        self.sheet 
= self.get_sheet() # 整個表格物件 def get_sheet(self): wb = xlrd.open_workbook(self.file_path) sheet = wb.sheet_by_name(self.sheet_name) return sheet def get_row_count(self): row_count = self.sheet.nrows return row_count def get_col_count(self): col_count
= self.sheet.ncols return col_count def __get_cell_value(self,row_index, col_index): cell_value = self.sheet.cell_value(row_index,col_index) return cell_value def get_merged_info(self): merged_info = self.sheet.merged_cells return merged_info def
get_merged_cell_value(self,row_index, col_index): """既能獲取普通單元格的資料又能獲取合併單元格資料""" cell_value = None for (rlow, rhigh, clow, chigh) in self.get_merged_info(): if (row_index >= rlow and row_index < rhigh): if (col_index >= clow and col_index < chigh): cell_value = self.__get_cell_value(rlow, clow) break; # 防止迴圈去進行判斷出現值覆蓋的情況 else: cell_value = self.__get_cell_value(row_index, col_index) else: cell_value = self.__get_cell_value(row_index, col_index) return cell_value def get_sheet_data_by_dict(self): all_data_list = [] first_row = self.sheet.row(0) #獲取首行資料 for row in range(1, self.get_row_count()): row_dict = {} for col in range(0, self.get_col_count()): row_dict[first_row[col].value] = self.get_merged_cell_value(row, col) all_data_list.append(row_dict) return all_data_list if __name__=='__main__': current_path = os.path.dirname(__file__) excel_path = os.path.join( current_path,'..','samples/data/test_case.xlsx' ) excelUtils = ExcelUtils(excel_path,"Sheet1") for row in excelUtils.get_sheet_data_by_dict(): print( row )
View Code

testdata_utils.py 讀取Excel中的資料後處理成需要的資料

import os
from common.excel_utils import ExcelUtils
from common.config_utils import config

current_path = os.path.dirname(__file__)
test_data_path = os.path.join( current_path,'..', config.CASE_DATA_PATH )

class TestdataUtils():
    def __init__(self,test_data_path = test_data_path):
        self.test_data_path = test_data_path
        self.test_data = ExcelUtils(test_data_path,"Sheet1").get_sheet_data_by_dict()
        self.test_data_by_mysql = SqlUtils().get_mysql_test_case_info()


    def __get_testcase_data_dict(self):
        testcase_dict = {}
        for row_data in self.test_data:
            testcase_dict.setdefault( row_data['測試用例編號'],[] ).append( row_data )
        return testcase_dict

    def def_testcase_data_list(self):
        testcase_list = []
        for k,v in self.__get_testcase_data_dict().items():
            one_case_dict = {}
            one_case_dict["case_id"] = k
            one_case_dict["case_info"] = v
            testcase_list.append( one_case_dict )
        return testcase_list


if __name__=="__main__":
    testdataUtils = TestdataUtils()
    for i in testdataUtils.def_testcase_data_list():
        print( i )
View Code

四、request封裝(核心封裝)

requests_utils.py   包含post請求,get請求,異常,呼叫斷言
import ast
import re
import requests
import jsonpath
from requests.exceptions import RequestException
from requests.exceptions import ProxyError
from requests.exceptions import ConnectionError
from common.config_utils import config
from common.check_utils import CheckUtils

class RequestsUtils():
    def __init__(self):
        self.hosts =  config.hosts
        self.headers = {"ContentType":"application/json;charset=utf-8"}
        self.session = requests.session()
        self.temp_variables = {}

    def __get(self,get_info):
        try:
            url = self.hosts + get_info["請求地址"]
            response = self.session.get( url = url,
                                         params = ast.literal_eval(get_info["請求引數(get)"])
                                         )
            response.encoding = response.apparent_encoding
            if get_info["取值方式"] == "json取值":
                value = jsonpath.jsonpath( response.json(),get_info["取值程式碼"] )[0]
                self.temp_variables[ get_info["傳值變數"] ] = value
            elif get_info["取值方式"] == "正則取值":
                value = re.findall(get_info["取值程式碼"],response.text)[0]
                self.temp_variables[get_info["傳值變數"]] = value
            result = CheckUtils(response).run_check(get_info['期望結果型別'], get_info['期望結果'])
        except ProxyError as e:
            result = {'code': 4, 'result': '[%s]請求:代理錯誤異常' % (get_info["介面名稱"])}
        except ConnectionError as e:
            result = {'code': 4, 'result': '[%s]請求:連線超時異常' % (get_info["介面名稱"])}
        except RequestException as e:
            result = {'code': 4, 'result': '[%s]請求:Request異常,原因:%s' % (get_info["介面名稱"], e.__str__())}
        except Exception as e:
            result = {'code':4,'result':'[%s]請求:系統異常,原因:%s'%(get_info["介面名稱"],e.__str__())}
        return result

    def __post(self,post_info):
        try:
            url = self.hosts + post_info["請求地址"]
            response = self.session.post( url = url,
                                         headers = self.headers,
                                         params = ast.literal_eval(post_info["請求引數(get)"]),
                                        # data = post_infos["提交資料(post)"],
                                         json=ast.literal_eval(post_info["提交資料(post)"])
                                        )
            response.encoding = response.apparent_encoding
            if post_info["取值方式"] == "json取值":
                value = jsonpath.jsonpath( response.json(),post_info["取值程式碼"] )[0]
                self.temp_variables[ post_info["傳值變數"] ] = value
            elif post_info["取值方式"] == "正則取值":
                value = re.findall(post_info["取值程式碼"],response.text)[0]
                self.temp_variables[post_info["傳值變數"]] = value
            #呼叫CheckUtils()
            result = CheckUtils(response).run_check(post_info['期望結果型別'],post_info['期望結果'])
        except ProxyError as e:
            result = {'code': 4, 'result': '[%s]請求:代理錯誤異常' % (post_info["介面名稱"])}
        except ConnectionError as e:
            result = {'code': 4, 'result': '[%s]請求:連線超時異常' % (post_info["介面名稱"])}
        except RequestException as e:
            result = {'code': 4, 'result': '[%s]請求:Request異常,原因:%s' % (post_info["介面名稱"], e.__str__())}
        except Exception as e:
            result = {'code':4,'result':'[%s]請求:系統異常,原因:%s'%(post_info["介面名稱"],e.__str__())}
        return result

    def request(self,step_info):
        try:
            request_type = step_info["請求方式"]
            param_variable_list = re.findall('\\${\w+}', step_info["請求引數(get)"])
            if param_variable_list:
                for param_variable in param_variable_list:
                    step_info["請求引數(get)"] = step_info["請求引數(get)"]\
                        .replace(param_variable,'"%s"' % self.temp_variables.get(param_variable[2:-1]))
            if request_type == "get":
                result = self.__get( step_info )
            elif request_type == "post":
                data_variable_list = re.findall('\\${\w+}', step_info["提交資料(post)"])
                if data_variable_list:
                    for param_variable in data_variable_list:
                        step_info["提交資料(post)"] = step_info["提交資料(post)"] \
                            .replace(param_variable, '"%s"' % self.temp_variables.get(param_variable[2:-1]))
                result = self.__post( step_info )
            else:
                result = {'code':1,'result':'請求方式不支援'}
        except Exception as e:
            result = {'code':4,'result':'用例編號[%s]的[%s]步驟出現系統異常,原因:%s'%(step_info['測試用例編號'],step_info["測試用例步驟"],e.__str__())}
        return result

    def request_by_step(self,step_infos):
        self.temp_variables = {}
        for step_info in step_infos:
            temp_result = self.request( step_info )
            # print( temp_result )
            if temp_result['code']!=0:
                break
        return temp_result


if __name__=="__main__":

    case_info = [
        {'請求方式': 'get', '請求地址': '/cgi-bin/token', '請求引數(get)': '{"grant_type":"client_credential","appid":"wxXXXXXxc16","secret":"XXXXXXXX"}', '提交資料(post)': '', '取值方式': 'json取值', '傳值變數': 'token', '取值程式碼': '$.access_token', '期望結果型別': '正則匹配', '期望結果': '{"access_token":"(.+?)","expires_in":(.+?)}'},
        {'請求方式': 'post', '請求地址': '/cgi-bin/tags/create', '請求引數(get)': '{"access_token":${token}}', '提交資料(post)': '{"tag" : {"name" : "衡東"}}','取值方式': '', '傳值變數': '', '取值程式碼': '', '期望結果型別': '正則匹配', '期望結果': '{"tag":{"id":(.+?),"name":"衡東"}}'}
    ]
    RequestsUtils().request_by_step(case_info)
View Code

五、斷言封裝(核心封裝)

check_utils.py  斷言封裝,與實際結果核對
import re
import ast

class CheckUtils():
    def __init__(self,check_response=None):
        self.ck_response=check_response
        self.ck_rules = {
            '': self.no_check,
            'json鍵是否存在': self.check_key,
            'json鍵值對': self.check_keyvalue,
            '正則匹配': self.check_regexp
        }
        self.pass_result = {
            'code': 0,
            'response_reason': self.ck_response.reason,
            'response_code': self.ck_response.status_code,
            'response_headers': self.ck_response.headers,
            'response_body': self.ck_response.text,
            'check_result': True,
            'message': ''  # 擴招作為日誌輸出等
        }
        self.fail_result = {
            'code': 2,
            'response_reason': self.ck_response.reason,
            'response_code': self.ck_response.status_code,
            'response_headers': self.ck_response.headers,
            'response_body': self.ck_response.text,
            'check_result': False,
            'message': ''  # 擴招作為日誌輸出等
        }


    def no_check(self):
        return self.pass_result


    def check_key(self,check_data=None):
        check_data_list = check_data.split(',')   #把需要判斷的值做切割,取出鍵值
        res_list = [] #存放每次比較的結果
        wrong_key = [] #存放比較失敗key
        for check_data in check_data_list:   #把切割的鍵值和取出響應結果中的所有的鍵一個一個對比
            if check_data in self.ck_response.json().keys():
                res_list.append(self.pass_result )
            else:
                res_list.append( self.fail_result )
                wrong_key.append(check_data)     #把失敗的鍵放進來,便於後續日誌輸出
        # print(res_list)
        # print(wrong_key)
        if self.fail_result in res_list:
            return self.fail_result
        else:
            return self.pass_result

    def check_keyvalue(self,check_data=None):
        res_list = []  # 存放每次比較的結果
        wrong_items = []  # 存放比較失敗 items
        for check_item in ast.literal_eval(check_data).items():  #literal_eval()安全性的把字串轉成字典,items()取出鍵值對
            if check_item in self.ck_response.json().items():
                res_list.append( self.pass_result )
            else:
                res_list.append( self.fail_result )
                wrong_items.append(check_item)
        # print( res_list )
        # print( wrong_items )

        if self.fail_result in res_list:
            return self.fail_result
        else:
            return self.pass_result

    def check_regexp(self,check_data=None):
        pattern = re.compile(check_data)
        if re.findall(pattern=pattern,string=self.ck_response.text):  #匹配到了,不為空,為true
            return self.pass_result
        else:
            return self.fail_result

    def run_check(self,check_type=None,check_data=None):
        code = self.ck_response.status_code
        if code == 200:
            if check_type in self.ck_rules.keys():
                result=self.ck_rules[check_type](check_data)
                return result
            else:
                self.fail_result['message'] = '不支援%s判斷方法'%check_type
                return self.fail_result
        else:
            self.fail_result['message'] = '請求的響應狀態碼非%s'%str(code)
            return self.fail_result




if __name__=="__main__":
   # 檢查鍵是否存在,{"access_token":"hello","expires_":7200} 設為響應結果,"access_token,expires_in" 為檢查物件值
    CheckUtils({"access_token":"hello","expires_":7200}).check_key("access_token,expires_in")
    #檢查鍵值對是否存在
    CheckUtils({"access_token":"hello","expires_i":7200}).check_keyvalue('{"expires_in": 7200}')
    #正則對比
   #TURE
    print(CheckUtils('{"access_token":"hello","expires_in":7200}').check_regexp('"expires_in":(.+?)'))
   #False
    print(CheckUtils('{"access_token":"hello","expires":7200}').check_regexp('"expires_in":(.+?)'))
View Code

六、api_testcase下的api_test.py 封裝

import warnings
import unittest
import paramunittest
from common.testdata_utils import TestdataUtils
from common.requests_utils import RequestsUtils

#如果是mysql資料來源的話切換成  def_testcase_data_list_by_mysql()   exccel資料來源:def_testcase_data_list()

case_infos = TestdataUtils().def_testcase_data_list_by_mysql()

@paramunittest.parametrized(
    *case_infos
)

class APITest(paramunittest.ParametrizedTestCase):
    def setUp(self) -> None:
        warnings.simplefilter('ignore', ResourceWarning)  #不會彈出警告提示

    def setParameters(self, case_id, case_info):
        self.case_id = case_id
        self.case_info = case_info

    def test_api_common_function(self):
        '''測試描述'''
        self._testMethodName = self.case_info[0].get("測試用例編號")
        self._testMethodDoc = self.case_info[0].get("測試用例名稱")
        actual_result = RequestsUtils().request_by_step(self.case_info)
        self.assertTrue( actual_result.get('check_result'),actual_result.get('message') )

if __name__ == '__main__':
    unittest.main()
View Code

七、common下的log_utils.py 封裝

import os
import logging
import time
from common.config_utils import config

current_path = os.path.dirname(__file__)
log_output_path = os.path.join( current_path,'..', config.LOG_PATH  )

class LogUtils():
    def __init__(self,log_path=log_output_path):
        self.log_name = os.path.join( log_output_path ,'ApiTest_%s.log'%time.strftime('%Y_%m_%d') )
        self.logger = logging.getLogger("ApiTestLog")
        self.logger.setLevel( config.LOG_LEVEL )

        console_handler = logging.StreamHandler()  # 控制檯輸出
        file_handler = logging.FileHandler(self.log_name,'a',encoding='utf-8')  # 檔案輸出
        formatter = logging.Formatter("%(asctime)s %(name)s %(levelname)s %(message)s")
        console_handler.setFormatter(formatter)
        file_handler.setFormatter(formatter)

        self.logger.addHandler( console_handler )
        self.logger.addHandler( file_handler )

        console_handler.close()  # 防止列印日誌重複
        file_handler.close()     # 防止列印日誌重複

    def get_logger(self):
        return self.logger

logger = LogUtils().get_logger()   # 防止列印日誌重複

if __name__ == '__main__':
    logger.info('hello')
View Code

八、common下的config_utils.py的封裝

配置檔案的編寫:

對配置檔案的讀取封裝:

import  os
import configparser

current_path = os.path.dirname(__file__)
cfgpath = os.path.join(current_path, "../conf/local_config.ini")
print(cfgpath)


class ConfigUtils:
    def __init__(self,config_path=cfgpath):
        self.__conf=configparser.ConfigParser()
        self.__conf.read(config_path, encoding="utf-8")

    def read_ini(self,sec,option):
        value=self.__conf.get(sec,option)
        return value

    @property
    def hosts(self):
        value=self.read_ini('default','hosts')
        return value

    @property
    def LOG_PATH(self):
        value = self.read_ini('path', 'LOG_PATH')
        return value

    @property
    def CASE_DATA_PATH(self):
        value = self.read_ini('path', 'CASE_DATA_PATH')
        return value

    @property
    def REPORT_PATH(self):
        value = self.read_ini('path', 'REPORT_PATH')
        return value

    @property
    def LOG_LEVEL(self):
        value = int(self.read_ini('log', 'LOG_LEVEL'))
        return value


    @property
    def smtp_server(self):
        smtp_server_value = self.read_ini('email', 'smtp_server')
        return smtp_server_value

    @property
    def smtp_sender(self):
        smtp_sender_value = self.read_ini('email', 'smtp_sender')
        return smtp_sender_value

    @property
    def smtp_password(self):
        smtp_password_value = self.read_ini('email', 'smtp_password')
        return smtp_password_value

    @property
    def smtp_receiver(self):
        smtp_receiver_value = self.read_ini('email', 'smtp_receiver')
        return smtp_receiver_value

    @property
    def smtp_cc(self):
        smtp_cc_value = self.read_ini('email', 'smtp_cc')
        return smtp_cc_value

    @property
    def smtp_subject(self):
        smtp_subject_value = self.read_ini('email', 'smtp_subject')
        return smtp_subject_value


config=ConfigUtils()


if __name__=='__main__':
    current_path = os.path.dirname(__file__)
    cfgpath = os.path.join(current_path, "../conf/local_config.ini")
    config_u=ConfigUtils()
    print(config_u.hosts)
    print(config_u.LOG_LEVEL)
View Code

九、test_runner下的run_case.py 封裝

class RunCase():
    def __init__(self):
        self.test_case_path = test_case_path
        self.report_path = test_report_path
        self.title = 'P1P2介面自動化測試報告'
        self.description = '自動化介面測試框架'
        self.tester = '測試開發組'
    def load_test_suite(self):
        discover = unittest.defaultTestLoader.discover(start_dir=self.test_case_path,
                                                       pattern='api_test.py',
                                                       top_level_dir=self.test_case_path)
        all_suite = unittest.TestSuite()
        all_suite.addTest( discover )
        return all_suite
    def run(self):
        report_dir = HTMLTestReportCN.ReportDirectory(self.report_path)
        report_dir.create_dir(self.title)
        report_file_path = HTMLTestReportCN.GlobalMsg.get_value('report_path')
        fp = open( report_file_path ,'wb' )
        runner = HTMLTestReportCN.HTMLTestRunner(stream=fp,
                                                 title=self.title,
                                                 description=self.description,
                                                 tester=self.tester)
        runner.run( self.load_test_suite() )
        fp.close()
        return report_file_path


if __name__=='__main__':
    report_path = RunCase().run()
    EmailUtils(open(report_path, 'rb').read(), report_path).send_mail()
View Code

十、common下的email_utils.py 封裝

import os
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from common.config_utils import config

class EmailUtils():
    def __init__(self,smtp_body,smtp_attch_path=None):
       self.smtp_server = config.smtp_server
       self.smtp_sender = config.smtp_sender
       self.smtp_password = config.smtp_password
       self.smtp_receiver = config.smtp_receiver
       self.smtp_cc = config.smtp_cc
       self.smtp_subject = config.smtp_subject
       self.smtp_body = smtp_body
       self.smtp_attch = smtp_attch_path

    def mail_message_body(self):
        message = MIMEMultipart()
        message['from'] = self.smtp_sender
        message['to'] = self.smtp_receiver
        message['Cc'] = self.smtp_cc
        message['subject'] = self.smtp_subject
        message.attach( MIMEText(self.smtp_body,'html','utf-8') )
        if self.smtp_attch:
            attach_file = MIMEText(open(self.smtp_attch, 'rb').read(), 'base64', 'utf-8')
            attach_file['Content-Type'] = 'application/octet-stream'
            attach_file.add_header('Content-Disposition', 'attachment', filename=('gbk', '', os.path.basename(self.smtp_attch)))
            message.attach(attach_file)
        return message

    def send_mail(self):
        smtp = smtplib.SMTP()
        smtp.connect(self.smtp_server)
        smtp.login(user=self.smtp_sender, password=self.smtp_password)
        smtp.sendmail(self.smtp_sender,self.smtp_receiver.split(",")+ self.smtp_cc.split(","), self.mail_message_body().as_string())

if __name__=='__main__':
    html_path = os.path.dirname(__file__) + '/../test_reports/介面自動化測試報告V1.1/介面自動化測試報告V1.1.html'
    EmailUtils('<h3 align="center">自動化測試報告</h3>',html_path).send_mail()
View Code