1. 程式人生 > 其它 >locust websocket 測試購買demo

locust websocket 測試購買demo

locust websocket 測試購買demo
from locust import User, task, events, constant
import time,requests
import websocket
import ssl
import json
import jsonpath
import json


def eventType_success(eventType, recvText, total_time):
    events.request_success.fire(request_type="[RECV]",
                                name=eventType,
                                response_time
=total_time, response_length=len(recvText)) class WebSocketClient(object): _locust_environment = None def __init__(self, host): self.host = host # 針對 WSS 關閉 SSL 校驗警報 self.ws = websocket.WebSocket(sslopt={"cert_reqs": ssl.CERT_NONE})
def connect(self, burl): start_time = time.time() try: self.conn = self.ws.connect(url=burl) except websocket.WebSocketConnectionClosedException as e: total_time = int((time.time() - start_time) * 1000) events.request_failure.fire( request_type
="[Connect]", name='Connection is already closed', response_time=total_time, exception=e) except websocket.WebSocketTimeoutException as e: total_time = int((time.time() - start_time) * 1000) events.request_failure.fire( request_type="[Connect]", name='TimeOut', response_time=total_time, exception=e) else: total_time = int((time.time() - start_time) * 1000) events.request_success.fire( request_type="[Connect]", name='WebSocket', response_time=total_time, response_length=0) return self.conn def recv(self): return self.ws.recv() def send(self, msg): self.ws.send(msg) class WebsocketUser(User): abstract = True def __init__(self, *args, **kwargs): super(WebsocketUser, self).__init__(*args, **kwargs) self.client = WebSocketClient(self.host) self.client._locust_environment = self.environment def getid(): id = requests.get('http://192.168.0.153:8000/api/user/Getuserid/').json() # print("------------------", id, ) # print("*------------",id) return id["data"] def getuserid(userid): token = requests.get('http://192.168.0.153:8000/api/user/GetRoboToken/', params={"id": userid, "level": 5}).json() # print("*-*/-*",token) return token["data"] class ApiUser(WebsocketUser): host = "wss://ws.xxxxx.com/" wait_time = constant(0) @task() def ws(self): # wss 地址 userid = getid() token = getuserid(userid) self.url = 'ws://47.243.58.228:8000/api/Primarymarket/ws?token='+token self.data = {} self.client.connect(self.url) # 傳送的訂閱請求 sendMsg = '{"Number":1,"Cardcaseid":1}' self.client.send(sendMsg)