python+locust效能測試學習筆記
阿新 • • 發佈:2020-07-17
前言
Locust(俗稱 蝗蟲)一個輕量級的開源壓測工具,基本功能是用Python程式碼描述所有測試。不需要笨拙的UI或龐大的XML,只需簡單的程式碼即可。
環境安裝
Locust支援Python 2.7, 3.4, 3.5, and 3.6的版本,小編的環境是python3.6直接用pip安裝就行
安裝命令:pip install locustio
官方文件
開始第一個例項
from locust import HttpLocust, TaskSet, task
class Testlocust(TaskSet):
def on_start(self):
print("start")
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.108 Safari/537.36'
}
@task(1)
def baidu_demo(self):
r = self.client.get("/", headers=self.headers, verify=False)
print(r.status_code)
assert r.status_code == 200
class WebsiteUser(HttpLocust):
task_set = Testlocust
min_wait = 1500
max_wait = 5000
if __name__ == "__main__":
import os
os.system("locust -f locust4.py --host=https://www.baidu.com")
- 使用@task裝飾的方法為一個事務,方法的引數用於指定該行為的執行權重,引數越大每次被使用者執行的概率越高,預設為1;
- on_start():每個locust使用者執行測試事務之前執行一次,用於做初始化的工作,如登入;
- host :要載入主機的URL,通常是在命令列啟動locust時使用--host選項指定,若命令列啟動時未指定,該屬性被使用;
- task_set:指向定義的一個使用者行為類;
- min_wait:模擬使用者在執行每個任務之間等待的最小時間,單位為毫秒;
- max_wait:模擬使用者在執行每個任務之間等待的最大時間,單位為毫秒
啟動locust
web模式啟動:os.system("locust -f locust4.py --host= https://www.baidu.com")
啟動成功:
在瀏覽器中輸入:http://localhost:8089/出現如下圖說明啟動成功
測試結果:
no-web模式啟動:os.system("locust -f locust4.py --host=https://www.baidu.com--no-web --csv=example -c 100 -r 10 -t 10s")
測試結果:
開始第二個例項壓kafka
import time from locust import TaskSet, task, Locust, events from kafka import KafkaProducer import json class UserBehavior(TaskSet): def on_start(self): self.producer = KafkaProducer(bootstrap_servers=['x.x.x.x:9092']) def on_stop(self): # 該方法當程式結束時每使用者進行呼叫,關閉連線 self.producer.close() @task(1) def sendAddCmd(self): start_time = time.time() time_unique_id = time.strftime("%Y-%m-%d-%H-%M-%S", time.localtime()) print(time_unique_id) print("===========================",start_time) try: timestamp = int(time.time()) message = { 'timestamp': timestamp, 'message': "121314" } msg = json.dumps(message) msg = msg.encode('utf-8') self.producer.send('preHandleTopic', msg) except Exception as e: total_time = int((time.time() - start_time) * 1000) events.request_failure.fire(request_type="kafka", name="add", response_time=total_time, response_length=0, exception=e) else: total_time = int((time.time() - start_time) * 1000) events.request_success.fire(request_type="kafka", name="add", response_time=total_time, response_length=0) class SocketUser(Locust): task_set = UserBehavior min_wait = 1000 # 單位毫秒 max_wait = 1000 # 單位毫秒 if __name__ == '__main__': import os os.system("locust -f SendKafka.py --host=x.x.x.x:9092"
啟動方式跟例項一相同
開始第三個例項壓tcp
from locust import HttpLocust, TaskSet, task
import socket # 匯入 socket 模組
host = socket.gethostname() # 獲取本地主機名
port = 8888 # 設定埠號
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((host, port))
class UserBehaviora(TaskSet):
def on_start(self):
print("start")
@task(1)
def bky_index(self):
sock.send(
b'121314')
re_mes = sock.recv(1024).decode()
print(re_mes)
class WebsiteUser(HttpLocust):
task_set = UserBehaviora
min_wait = 1500
max_wait = 5000
if __name__ == "__main__":
import os
os.system("locust -f locust6.py --host=x.x.x.x:xxxx")
啟動方式跟例項一相同
以上就是這篇文章的全部內容了,希望本文的內容對大家的學習或者工作具有一定的參考學習價值,如果有疑問大家可以留言交流,謝謝大家的支援。
小編微信公眾號:自動化測試 To share