websocket長連線壓力測試踩過的坑
Websocket協議壓測記錄
背景:
公司的行情繫統是採用的websocket協議,有請求和訂閱兩種方式向伺服器申請最新行情資訊。請求方式是一次的,訂閱方式是建立連線後,伺服器定時向客戶端推送行情資訊。
初步測試方案:
因考慮到websocket是雙工通訊,是長連線,並且本次壓測的效能指標是系統能建立的最大連線數,並且是建立連線後伺服器能持續向客戶端推送行情資訊。
基於以上原因考慮用python採用多執行緒建立連線,為了驗證能否收到推送的資訊,把返回的行情資訊儲存到文字檔案中。Python指令碼如下:
import websocket
import time
import threading
import gzip
#import json
#from threadpool import ThreadPool, makeRequests
#from websocket import create_connection
SERVER_URL = "ws://pro-web-new.devtest.exshell-dev.com/r1/main/ws"
#SERVER_URL = "wss://i.cg.net/wi/ws"
#SERVER_URL = "wss://www.exshell.com/r1/main/ws"
def on_message(ws, message):
print(message)
def on_error(ws, error):
print(error)
def on_close(ws):
print("### closed ###")
def on_open(ws):
def send_trhead():
send_info = '{"sub": "market.ethusdt.kline.1min","id": "id10"}'
#send_info = '{"event":"subscribe", "channel":"btc_usdt.deep"}'
while True:
#time.sleep(5)
#ws.send(json.dumps(send_info))
ws.send(send_info)
while (1):
compressData = ws.recv()
result = gzip.decompress(compressData).decode('utf-8')
if result[:7] == '{"ping"':
ts = result[8:21]
pong = '{"pong":' + ts + '}'
ws.send(pong)
ws.send(send_info)
else:
#print(result)
with open('./test_result.txt', 'a') as f:
f.write(threading.currentThread().name+'\n')
f.write(result+'\n')
t = threading.Thread(target=send_trhead)
t.start()
print(threading.currentThread().name)
def on_start(a):
# time.sleep(2)
# websocket.enableTrace(True)
# ws = websocket.WebSocketApp(SERVER_URL,
# on_message=on_message,
# on_error=on_error,
# on_close=on_close)
# ws.on_open = on_open
# ws.run_forever()
#print(a[2])
try:
ws = websocket.create_connection(SERVER_URL)
on_open(ws)
except Exception as e:
print('error is :',e)
print('connect ws error,retry...')
time.sleep(5)
if __name__ == "__main__":
# pool = ThreadPool(3)
# test = list()
# for ir in range(3):
# test.append(ir)
#
# requests = makeRequests(on_start, test)
# [pool.putRequest(req) for req in requests]
# pool.wait()
# # #on_start(1)
for ir in range(20):
on_start(1)
time.sleep(0.1)
初步測試結果:
在壓測的過程中,發現連線數達到一定程度(單機1400連線),連線就斷掉了,監控發現壓力機記憶體基本消耗光了,因建立連線,並接收返回的資訊,隨著連線數增加,記憶體消耗大,只能斷開連線,釋放記憶體。
調整測試方案:
和架構、開發討論後,準備在websocket客戶端採用AIO非同步通訊方式增大壓力,因當時是考慮到長連線未考慮這種方式,查詢資料,發現websocket服務端可以採用AIO非同步通訊方式,在websocket客戶端嘗試一下,採用locust + python的方式,也查找了一些資料,發現方案可行。
Locust是一款可擴充套件的,分散式的,效能測試的,開源的,用Python編寫的效能測試工具。對於測試HTTP協議的介面是比較方便的,但是它也支援測試別的協議的介面,不過需要重寫Locust類。指令碼如下:
from locust import Locust, events, task, TaskSet
import websocket
import time
import gzip
class WebSocketClient():
def __init__(self, host, port):
self.host = host
self.port = port
class WebSocketLocust(Locust):
def __init__(self, *args, **kwargs):
self.client = WebSocketClient("172.31.15.85", 9503)
class UserBehavior(TaskSet):
ws = websocket.WebSocket()
#self.ws.connect("ws://10.98.64.103:8807")
ws.connect("ws://pro-web-new.devtest.exshell-dev.com/r1/main/ws")
@task(1)
def buy(self):
try:
start_time = time.time()
#self.ws.send('{"url":"/buy","data":{"id":"123","issue":"20170822","doubled_num":2}}')
#result = self.ws.recv()
send_info = '{"sub": "market.ethusdt.kline.1min","id": "id10"}'
# send_info = '{"event":"subscribe", "channel":"btc_usdt.deep"}'
while True:
# time.sleep(5)
# ws.send(json.dumps(send_info))
ws.send(send_info)
while (1):
compressData = ws.recv()
result = gzip.decompress(compressData).decode('utf-8')
if result[:7] == '{"ping"':
ts = result[8:21]
pong = '{"pong":' + ts + '}'
ws.send(pong)
ws.send(send_info)
else:
# print(result)
with open('./test_result.txt', 'a') as f:
#f.write(threading.currentThread().name + '\n')
f.write(result + '\n')
except Exception as e:
print("error is:",e)
class ApiUser(WebSocketLocust):
task_set = UserBehavior
min_wait = 100
max_wait = 200
用命令執行指令碼:
Locust -f websocket_client_locust.py –-no-web -c 1 -r 1 -t 60s
單個使用者執行成功,並能生成檔案。但多個使用者執行的時候就報錯,報錯資訊如下:This socket is already used by another greenlet: <bound method Waiter.switch of <gevent.hub.Waiter object at 0x7f01f0594900>>
錯誤原因說的是socke正在被使用,但是我的程式碼中是新的socket,簡單分析了一下,應該不會出現問題,但是我的socek的使用部分是一個全域性的client,然後程式執行的時候出現了上述錯誤.仔細推測我找出了原因:
geven是個協程庫,那麼多個協程共用一個socek的時候就會出現上述錯誤了,於是我把socket改成了區域性的,問題解決.
修改前:
修改後:
修改後程式碼:
from locust import Locust, events, task, TaskSet
import websocket
import time
import gzip
class WebSocketClient():
def __init__(self, host):
self.host = host
#self.port = port
class WebSocketLocust(Locust):
def __init__(self, *args, **kwargs):
self.client = WebSocketClient("172.31.15.85")
class UserBehavior(TaskSet):
# ws = websocket.WebSocket()
# #self.ws.connect("ws://10.98.64.103:8807")
# ws.connect("ws://pro-web-new.devtest.exshell-dev.com/r1/main/ws")
@task(1)
def buy(self):
try:
ws = websocket.WebSocket()
# self.ws.connect("ws://10.98.64.103:8807")
ws.connect("ws://pro-web-new.devtest.exshell-dev.com/r1/main/ws")
start_time = time.time()
#self.ws.send('{"url":"/buy","data":{"id":"123","issue":"20170822","doubled_num":2}}')
#result = self.ws.recv()
send_info = '{"sub": "market.ethusdt.kline.1min","id": "id10"}'
# send_info = '{"event":"subscribe", "channel":"btc_usdt.deep"}'
while True:
# time.sleep(5)
# ws.send(json.dumps(send_info))
ws.send(send_info)
while (1):
compressData = ws.recv()
result = gzip.decompress(compressData).decode('utf-8')
if result[:7] == '{"ping"':
ts = result[8:21]
pong = '{"pong":' + ts + '}'
ws.send(pong)
ws.send(send_info)
# else:
# # print(result)
# with open('./test_result.txt', 'a') as f:
# #f.write(threading.currentThread().name + '\n')
# f.write(result + '\n')
except Exception as e:
print("error is:",e)
class ApiUser(WebSocketLocust):
task_set = UserBehavior
min_wait = 100
max_wait = 200
壓測開始,隨著使用者數上升,壓力機端發生如下錯誤:500和502錯誤
這是協議進行握手時失敗,查詢後端行情應用伺服器,也有大量報錯。
檢視伺服器發現開啟最大檔案數是1024,調整到65535,用如下命令調整:
第一步,修改/etc/sysctl.conf檔案,在檔案中新增如下行:
net.ipv4.ip_local_port_range = 1024 65000
這表明將系統對本地埠範圍限制設定為1024~65000之間。請注意,本地埠範圍的最小值必須大於或等於1024;而埠範圍的最大值則應小於或等於65535.修改完後儲存此檔案。
第二步,執行sysctl命令:
[[email protected] ~]$ sysctl -p
如果系統沒有錯誤提示,就表明新的本地埠範圍設定成功。如果按上述埠範圍進行設定,則理論上單獨一個程序最多可以同時建立60000多個TCP客戶端連線。
調整完成後複測,發現還是報這個錯誤,請開發進行定位分析應用程式。