1. 程式人生 > 實用技巧 >python-socketio 多程序部署

python-socketio 多程序部署

實現原理

當使用多程序的時候。多個socketio伺服器通過訊息佇列來溝通之間的客戶端sid。若發現該sid在自己的連線中。就由該程序處理髮送給其下面連線的客戶端

詳細的可以看這裡

socket.io要實現多程序以及廣播,房間等功能,勢必需要接入一個redis之類的訊息佇列,進而socket.io的emit會呼叫對應佇列管理器pubsub_manager的emit方法,比如用redis做訊息佇列則最終呼叫 redis_manager中的_publish()

方法通過redis的訂閱釋出功能將訊息推送到flask_socketio頻道。另一方面,每個程序在初始化時都訂閱了 flask_socketio頻道,而且都有一個協程(或執行緒)在監聽頻道中是否有訊息,一旦有訊息,就會呼叫pubsub_manager._handle_emit()方法對本機對應的socket傳送對應的訊息,最終是通過socket.io伺服器的_emit_internal()方法實現對本機中room為sid的所有socket傳送訊息的,如果room為None,則就是廣播,即對所有連線到本機的所有客戶端推送訊息。

作者:ssjhust
連結:https://juejin.cn/post/6844903632819716110
來源:掘金
著作權歸作者所有。商業轉載請聯絡作者獲得授權,非商業轉載請註明出處。

服務端實現

設定client_manger即可
mgr = socketio.AsyncRedisManager('redis://127.0.0.1:6379/2')
sio = socketio.AsyncServer(async_mode='asgi',client_manager=mgr)
#!/usr/bin/env python
import asyncio

import uvicorn

import socketio
# 新增一行clientManger
mgr = socketio.AsyncRedisManager('redis://127.0.0.1:6379/2')
sio = socketio.AsyncServer(async_mode='asgi',client_manager=mgr)
app = socketio.ASGIApp(sio, static_files={
    '/': 'app.html',
})

@sio.on('my_event')
async def test_message(sid, message):
    await sio.emit('my_response', {'data': message['data']}, room=sid)

@sio.on('connect')
async def test_connect(sid, environ):
    print(sid)
    await sio.emit('my_response', {'data': 'Connected', 'count': sid}, room=sid)


@sio.on('disconnect')
def test_disconnect(sid):
    print('Client disconnected')


if __name__ == '__main__':
    uvicorn.run(app, host='127.0.0.1', port=5000)

實驗效果

部署兩個app 分別監聽5000和50001 表示負載均衡後面的兩個伺服器
然後用a客戶端連線5000。用b客戶端向a發訊息。如果a接收成功證明伺服器之間進行了通訊轉發

app1.py

#!/usr/bin/env python
import asyncio

import uvicorn

import socketio
# 新增一行clientManger
mgr = socketio.AsyncRedisManager('redis://127.0.0.1:6379/2')
sio = socketio.AsyncServer(async_mode='asgi',client_manager=mgr)
app = socketio.ASGIApp(sio, static_files={
    '/': 'app.html',
})
background_task_started = False


@sio.on('my_event')
async def test_message(sid, message):
    print(message)

@sio.on('disconnect request')
async def disconnect_request(sid):
    await sio.disconnect(sid)


@sio.on('connect')
async def test_connect(sid, environ):
    print(sid)
    await sio.emit('my_response', {'data': 'Connected', 'count': sid}, room=sid)


@sio.on('disconnect')
def test_disconnect(sid):
    print('Client disconnected')


if __name__ == '__main__':
    uvicorn.run(app, host='127.0.0.1', port=5000)

app2.py
只需要修改埠號

if __name__ == '__main__':
    uvicorn.run(app, host='127.0.0.1', port=5001)

接收訊息的客戶端1 連線app1伺服器

import socketio

sio = socketio.Client()

@sio.event
def connect():
    print('connection established')

@sio.on("my_event")
def my_message(data):
    print('message received with ', data)

@sio.event
def disconnect():
    print('disconnected from server')

sio.connect('http://localhost:5000')
sio.wait()

傳送訊息的客戶端 連線app2

import socketio
import json
sio = socketio.Client()

@sio.event
def connect():
    print('connection established')

@sio.on("my_event")
def my_message(data):
    print('message received with ', data)

@sio.event
def disconnect():
    print('disconnected from server')

sio.connect('http://localhost:5001')
data = json.dumps({"message":"拜託了,另一個我","sid":"d84b2e8cbcde479f813627b6ab8e4a47"},ensure_ascii=False)
sio.emit("my_event",data)
sio.wait()

注意傳送者需要提供sid 才能誇程序傳送成功