python-socketio 多程序部署
阿新 • • 發佈:2020-12-06
實現原理
當使用多程序的時候。多個socketio伺服器通過訊息佇列來溝通之間的客戶端sid。若發現該sid在自己的連線中。就由該程序處理髮送給其下面連線的客戶端
詳細的可以看這裡
socket.io要實現多程序以及廣播,房間等功能,勢必需要接入一個redis之類的訊息佇列,進而socket.io的emit會呼叫對應佇列管理器pubsub_manager的emit方法,比如用redis做訊息佇列則最終呼叫 redis_manager中的_publish()
方法通過redis的訂閱釋出功能將訊息推送到flask_socketio頻道。另一方面,每個程序在初始化時都訂閱了 flask_socketio頻道,而且都有一個協程(或執行緒)在監聽頻道中是否有訊息,一旦有訊息,就會呼叫pubsub_manager._handle_emit()方法對本機對應的socket傳送對應的訊息,最終是通過socket.io伺服器的_emit_internal()方法實現對本機中room為sid的所有socket傳送訊息的,如果room為None,則就是廣播,即對所有連線到本機的所有客戶端推送訊息。
作者:ssjhust
連結:https://juejin.cn/post/6844903632819716110
來源:掘金
著作權歸作者所有。商業轉載請聯絡作者獲得授權,非商業轉載請註明出處。
服務端實現
設定client_manger即可
mgr = socketio.AsyncRedisManager('redis://127.0.0.1:6379/2')
sio = socketio.AsyncServer(async_mode='asgi',client_manager=mgr)
#!/usr/bin/env python import asyncio import uvicorn import socketio # 新增一行clientManger mgr = socketio.AsyncRedisManager('redis://127.0.0.1:6379/2') sio = socketio.AsyncServer(async_mode='asgi',client_manager=mgr) app = socketio.ASGIApp(sio, static_files={ '/': 'app.html', }) @sio.on('my_event') async def test_message(sid, message): await sio.emit('my_response', {'data': message['data']}, room=sid) @sio.on('connect') async def test_connect(sid, environ): print(sid) await sio.emit('my_response', {'data': 'Connected', 'count': sid}, room=sid) @sio.on('disconnect') def test_disconnect(sid): print('Client disconnected') if __name__ == '__main__': uvicorn.run(app, host='127.0.0.1', port=5000)
實驗效果
部署兩個app 分別監聽5000和50001 表示負載均衡後面的兩個伺服器
然後用a客戶端連線5000。用b客戶端向a發訊息。如果a接收成功證明伺服器之間進行了通訊轉發
app1.py
#!/usr/bin/env python import asyncio import uvicorn import socketio # 新增一行clientManger mgr = socketio.AsyncRedisManager('redis://127.0.0.1:6379/2') sio = socketio.AsyncServer(async_mode='asgi',client_manager=mgr) app = socketio.ASGIApp(sio, static_files={ '/': 'app.html', }) background_task_started = False @sio.on('my_event') async def test_message(sid, message): print(message) @sio.on('disconnect request') async def disconnect_request(sid): await sio.disconnect(sid) @sio.on('connect') async def test_connect(sid, environ): print(sid) await sio.emit('my_response', {'data': 'Connected', 'count': sid}, room=sid) @sio.on('disconnect') def test_disconnect(sid): print('Client disconnected') if __name__ == '__main__': uvicorn.run(app, host='127.0.0.1', port=5000)
app2.py
只需要修改埠號
if __name__ == '__main__':
uvicorn.run(app, host='127.0.0.1', port=5001)
接收訊息的客戶端1 連線app1伺服器
import socketio
sio = socketio.Client()
@sio.event
def connect():
print('connection established')
@sio.on("my_event")
def my_message(data):
print('message received with ', data)
@sio.event
def disconnect():
print('disconnected from server')
sio.connect('http://localhost:5000')
sio.wait()
傳送訊息的客戶端 連線app2
import socketio
import json
sio = socketio.Client()
@sio.event
def connect():
print('connection established')
@sio.on("my_event")
def my_message(data):
print('message received with ', data)
@sio.event
def disconnect():
print('disconnected from server')
sio.connect('http://localhost:5001')
data = json.dumps({"message":"拜託了,另一個我","sid":"d84b2e8cbcde479f813627b6ab8e4a47"},ensure_ascii=False)
sio.emit("my_event",data)
sio.wait()
注意傳送者需要提供sid 才能誇程序傳送成功