利用python對websocket進行併發壓測
阿新 • • 發佈:2022-03-17
簡述
產品經理鑑於運營反饋並對程式的websocket長連線保持懷疑的態度,讓我對websocket伺服器進行壓力測試,我內心是拒絕的。
開發思路
查閱websocket的相關資料,查到python的websocket有兩個連線方式,長連線:WebSocketApp,短連線:create_connection。
使用 WebSocketApp 的話,我沒辦法獲取websocket伺服器端返回的資料,這個我還在研究,這裡使用 create_connection 來進行壓測。
連線:
1 ws = create_connection("websocket的伺服器地址")
獲取連線狀態:
1print("獲取連線狀態:", ws.connected)
傳送訊息至伺服器並接收返回結果:
1 initialize_data = '{"code":2001, "msg": "","data": ""}' 2 ws.send(initialize_data) 3 result_initialize = ws.recv() # 獲取返回結果 4 print("接收結果:", result_initialize)
因為create_connection為短連線,所以這一系列下來後連線會斷開,所以這個時候需要一個東西來維持連線,長連線叫心跳,那短連線這裡我願成為:心臟起搏器(while True)
完整程式碼:
1 import threading 2 from websocket import create_connection 3 import time 4 5 thread = [] 6 def socket(): 7 while True: 8 ws = create_connection("websocket伺服器地址") 9 # 2、獲取連線狀態 10 print("獲取連線狀態:", ws.connected) 11 print("獲取伺服器返回的連線結果", ws.recv())12 if ws.connected == True: 13 initialize = '{"code":2001, "msg": "","data": {}}' 14 ws.send(initialize) 15 result_initialize = ws.recv() # 獲取返回結果 16 print("接收結果:", result_initialize) 17 if __name__ == "__main__": 18 for i in range(25): 19 t = threading.Thread(target=socket, args=()) 20 t.start()