1. 程式人生 > 其它 >利用python對websocket進行併發壓測

利用python對websocket進行併發壓測

簡述

產品經理鑑於運營反饋並對程式的websocket長連線保持懷疑的態度,讓我對websocket伺服器進行壓力測試,我內心是拒絕的。

 

開發思路

查閱websocket的相關資料,查到python的websocket有兩個連線方式,長連線:WebSocketApp,短連線:create_connection。

使用 WebSocketApp 的話,我沒辦法獲取websocket伺服器端返回的資料,這個我還在研究,這裡使用 create_connection 來進行壓測。

連線:

1 ws = create_connection("websocket的伺服器地址")

獲取連線狀態:

1
print("獲取連線狀態:", ws.connected)

傳送訊息至伺服器並接收返回結果:

1 initialize_data = '{"code":2001, "msg": "","data": ""}'
2 ws.send(initialize_data)
3 result_initialize = ws.recv()        # 獲取返回結果
4 print("接收結果:", result_initialize)

因為create_connection為短連線,所以這一系列下來後連線會斷開,所以這個時候需要一個東西來維持連線,長連線叫心跳,那短連線這裡我願成為:心臟起搏器(while True)

完整程式碼:

 1 import threading
 2 from websocket import create_connection
 3 import time
 4 
 5 thread = []
 6 def socket():
 7     while True:
 8         ws = create_connection("websocket伺服器地址")
 9         # 2、獲取連線狀態
10         print("獲取連線狀態:", ws.connected)
11         print("獲取伺服器返回的連線結果", ws.recv())
12 if ws.connected == True: 13 initialize = '{"code":2001, "msg": "","data": {}}' 14 ws.send(initialize) 15 result_initialize = ws.recv() # 獲取返回結果 16 print("接收結果:", result_initialize) 17 if __name__ == "__main__": 18 for i in range(25): 19 t = threading.Thread(target=socket, args=()) 20 t.start()