【rocketmq-client-python】學習筆記
阿新 • • 發佈:2021-08-20
rocketmq-python 是一個基於 rocketmq-client-cpp 封裝的 RocketMQ Python 客戶端。
rocketmq-client-python安裝
目前rocketmq庫只支援linux和mac。
rocketmq-client-python 的安裝:
pip install rocketmq
安裝太慢?國內源安裝:
pip install rocketmq -i https://pypi.tuna.tsinghua.edu.cn/simple
示例程式碼:
Producer
from rocketmq.client import Producer, Message producer = Producer('PID-XXX') producer.set_namesrv_domain('http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet')#rocketmq佇列介面地址(伺服器ip:port) # For ip and port name server address, use `set_namesrv_addr` method, for example: # producer.set_namesrv_addr('127.0.0.1:9887') producer.set_session_credentials('XXX', 'XXXX', 'ALIYUN')#可以不使用 producer.start() msg_body = {"id":"test_id","name":"test_name","message":"test_message"} ss = json.dumps(msg_body).encode('utf-8') msg = Message('YOUR-TOPIC') #topic名稱 msg.set_keys('XXX')#每個訊息在業務層面的唯一標識碼,要設定到keys欄位,方便將來定位訊息丟失問題。伺服器會為每個訊息建立索引(雜湊索引),應用可以通過topic,key來查詢這條訊息內容,以及訊息被誰消費。由於是雜湊索引,請務必保證key儘可能唯一,這樣可以避免潛在的雜湊衝突。 msg.set_tags('XXX')#一個應用盡可能用一個Topic,訊息子型別用tags來標識,tags可以由應用自由設定。只有傳送訊息設定了tags,消費方在訂閱訊息時,才可以利用tags在broker做訊息過濾。 msg.set_body(ss) ret = producer.send_sync(msg) print(ret.status, ret.msg_id, ret.offset) producer.shutdown()
其中:
設定ip:port的位置:producer.set_namesrv_addr('xxx.xxx.xxx.xxx:xxxxx')
當只有單一伺服器時,格式是上面這個;
當有多個伺服器地址(叢集模式)時,可以使用:producer.set_namesrv_addr("xxx.xxx.xxx.xxx:xxxxx,xxx.xxx.xxx.xxx:xxxxx")
如果使用pandas資料,pandas資料可以直接轉換
df.to_json(orient='records').encode('utf-8'),然後放入body中傳送。
不同應用的多個Topic使用同一個namesrv_addr時資料傳輸會發生衝突
解決方案:每一個Topic對應一個 “PID-XXX”