1. 程式人生 > 實用技巧 >Locust效能測試 --headless 模式下在 Grafana 展示聚合報告

Locust效能測試 --headless 模式下在 Grafana 展示聚合報告

locust版本:1.1.1

參考地址:https://www.jianshu.com/p/c6802a88beaf

locust 效能指令碼coupon_test.py

from locust import task, between, tag
from locust.contrib.fasthttp import FastHttpUser
from public.common import setup_request, store, coupon # 引數化函式、加密處理
from stats.listener import * # hook函式


class MyUser(FastHttpUser):
    host 
= "" wait_time = between(0, 0) @tag("getCustomerCouponList") @task def get_customer_coupon_list(self): data = {"status": 0, "pageSize": 15, "pageIndex": 1} req = {"body": data, "params": {}, "headers": {}} req = setup_request(req, self.host) with self.client.post(
"/api/coupons/app/getCustomerCouponList", headers=req["headers"], json=req["body"], name="/api/coupons/app/getCustomerCouponList", catch_response=True) as res: try: response = res.json() except Exception as e: logger.error(
"=======/api/coupons/app/getCustomerCouponList=== Exception ====>>>> {} >>>> {}".format(res.text, e)) res.failure("json error!") return try: if response["success"] and response["message"] == "請求成功": res.success() else: logger.error("=======/api/coupons/app/getCustomerCouponList=== 斷言 ====>>>> {}".format(response)) res.failure("斷言失敗!") except KeyError as e: logger.error("=======/api/coupons/app/getCustomerCouponList=== KeyError ====>>>> {} >>>> {}".format(res.text, e)) res.failure("key error!")
  ......

--headless模式執行命令

locust -f testcases\coupon\coupon_test.py --headless -u 10 -r 3 -t 1m  -H https://test-varian.quanqiuwa.com --logfile=log\locust.log --loglevel=INFO 1>log\run.log 2>&1 --master

生成run.log檔案格式

讀取日誌檔案並寫入 influxdb 資料庫


#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2020-07-23 10:32
# @Author : lixiaofeng
# @Site :
# @File : helper.py
# @Software: PyCharm
import time
import osimport jsonimport re
import platform
from io import StringIO
from public.connect_influxdb import TestInflux

def read_performance(t):
    """
    讀取日誌中的效能資料
    :param t: 時間 time.time()
    :return:
    """
    t1 = h.getvalue()
    if t1 and t - float(t1[:18]) < 5:  # 5秒寫入一次資料庫
        return
    h.seek(0)
    h.write(str(t))
    performance_path = os.path.join(BASE_DIR, "log" + pattern + "run.log")
    with io.open(performance_path) as f:
        data_list = f.readlines()
        locust_list = []
        i = 0
        for data in data_list:
            res = re.match(
                r'^\s+(?P<method>GET|POST)\s+(?P<api>[\/\w\?\=\-&]+)\s+(?P<reqs>\d+)\s+(?P<fails>[\d\(\.\)\%]+)\s+(?P<Avg>\d+)\s+(?P<Min>\d+)\s+(?P<Max>\d+)\s+(\|)\s+(?P<Median>\d+)\s+(?P<qps>[\d\(\.\)\%]+)\s+(?P<failures>[\d\(\.\)\%]+)$',
                data) # 正則,匹配資料
            if res:
                i += 1
                # print(res.group('method'), res.group('api'), res.group('reqs'), res.group('fails'), res.group('Avg'),
                #       res.group('Min'), res.group('Max'), res.group('Median'), res.group('qps'), res.group('failures'))
                method = res.group('method')
                api = res.group('api')
                reqs = res.group('reqs')
                fails = res.group('fails')
                Avg = res.group('Avg')
                Min = res.group('Min')
                Max = res.group('Max')
                Median = res.group('Median')
                qps = res.group('qps')
                failures = res.group('failures')
                locust_dict = {'Type': method, 'Name': api, 'Requests': reqs, 'Fails': fails, 'Average_ms': Avg,
                               'Min_ms': Min,
                               'Max_ms': Max, 'Median_ms': Median, 'Current_RPS': qps, 'Failures_s': failures}
                locust_list.append(locust_dict)
        influx.post_dump_data(locust_list)

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2020-07-31 13:21
# @Author : lixiaofeng
# @Site :
# @File : connect_influxdb.py
# @Software: PyCharm
from influxdb import InfluxDBClient


class TestInflux:
    """
    連線 influxdb ,並寫入資料
    """

    def __init__(self):
        self.influx_client = InfluxDBClient('localhost', 8086, '', '', 'mydb')
        self.make = False
        for database in self.influx_client.get_list_database():
            if "mydb" not in database["name"]:
                self.make = True
            else:
                # self.influx_client.drop_measurement("locust") # 寫入前刪除資料
                self.make = False
        if self.make:
            self.influx_client.create_database("mydb")

    def post_dump_data(self, data):
        """
        :param data:  資料 list
        :return:
        """
        if isinstance(data, list):
            for key in data:
                json_body = [
                    {
                        "measurement": "locust",
                        # "tags": key,
                        "fields": key
                    }
                ]
                self.influx_client.write_points(json_body)

對hook函式中對read_performance 函式進行呼叫

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2020-07-29 11:20
# @Author  : lixiaofeng
# @Site    : 
# @File : listener.py
# @Software: PyCharm


import time
from locust import events
from loguru import logger
from public.helper import read_performance


@events.quitting.add_listener
def process_exit(environment, **kwargs):
    """
    locust 退出時呼叫
    :param environment:
    :param kwargs:
    :return:
    """
    if environment.stats.total.fail_ratio > 0.01:
        logger.error("Test failed due to failure ratio > 1%")
        environment.process_exit_code = 1
    elif environment.stats.total.avg_response_time > 200:
        logger.error("Test failed due to average response time ratio > 200 ms")
        environment.process_exit_code = 1
    elif environment.stats.total.get_response_time_percentile(0.95) > 800:
        logger.error("Test failed due to 95th percentile response time > 800 ms")
        environment.process_exit_code = 1
    else:
        environment.process_exit_code = 0
    read_performance(time.time())


@events.request_success.add_listener
def success(request_type, name, response_time, response_length, **kwargs):
    """
    介面 請求成功時呼叫
    :param request_type:
    :param name:
    :param response_time:
    :param response_length:
    :param kwargs:
    :return:
    """
    read_performance(time.time())


@events.request_failure.add_listener
def failure(request_type, name, response_time, response_length, **kwargs):
    """
    介面 請求失敗時呼叫
    :param request_type:
    :param name:
    :param response_time:
    :param response_length:
    :param kwargs:
    :return:
    """
    read_performance(time.time())

influxdb插入的資料

配置Grafana<安裝步驟網上有詳細教程,不在贅述>

設定influxdb資料來源

面板中選擇 視覺化 table 展示,使用sql查詢資料

select Type,"Name",Requests,Fails,Average_ms,Min_ms,Max_ms,Median_ms,Current_RPS,Failures_s from locust order by time desc  limit 3

和web模式對比

~~~【完】