python實現的訊息佇列RocketMQ客戶端使用
阿新 • • 發佈:2021-12-16
rocketmq-python 是一個基於 rocketmq-client-cpp 封裝的 RocketMQ Python 客戶端。
一、Producer
#coding:utf-8
import json from rocketmq.client import Producer, Message producer = Producer('PID-001') # 例項化Producer物件,指定group-id(可任意取名) producer.set_namesrv_addr('xxxxxx:xx') # rocketmq佇列介面地址(伺服器ip:port) producer.start() #開啟 # 例項化訊息物件,需要指定應用名:topic_name msg = Message('your_topic_name') # 例項化訊息物件時,傳入topic名稱,一個應用盡可能用一個Topic # 指定訊息的keys msg.set_keys('your_keys') # 業務層面的唯一標識碼,方便將來定位訊息丟失問題。伺服器會為每個訊息建立索引(雜湊索引),應用可以通過topic,key來查詢這條訊息內容,以及訊息被誰消費。 # 指定訊息tags msg.set_tags('your_tags') # 訊息子型別用tags來標識,tags可以由應用自由設定。 #指定訊息體(內容)msg_body = {'name':'laowang','age':28} body = json.dumps(msg_body).encode('utf-8') msg.set_body(body) # 傳入訊息體(json位元組串) # 向佇列傳送訊息 ret = producer.send_sync(msg) print(f'status:{ret.status}') # 0表示OK print(f'msg_id:{ret.msg_id}') # 訊息id,同消費者獲取到的訊息id print(f'offset:{ret.offset}') # 偏移量,預設從0開始,1,2。。。 producer.shutdown()# 關閉
二、PullConsumer
# coding:utf-8 from rocketmq.client import PullConsumer consumer = PullConsumer('CID-001') # 指定group-id consumer.set_namesrv_addr('xxxxxx:xx') # rocketmq佇列介面地址(伺服器ip:port) consumer.start() # 開啟 # 可重複性消費 # 指定topic-name for msg in consumer.pull('your_topic_name'): print(f'id:{msg.id}') # 訊息id print(f'topic:{msg.topic}') # 訊息topic_name print(f'tags:{msg.tags}') # 訊息tags print(f'keys:{msg.keys}') # 訊息Keys print(f'body:{msg.body}') # 訊息體 print('-'*25+'分隔符'+'-'*25) consumer.shutdown() # 關閉
三、PushConsumer
# coding:utf-8 import time from rocketmq.client import PushConsumer # 回撥函式,引數是訊息物件 def callback(msg): print(msg.id, msg.body) consumer = PushConsumer('CID_XXX') # 指定group-id consumer.set_namesrv_addr('127.0.0.1:9887') # rocketmq佇列介面地址(伺服器ip:port) consumer.subscribe('Your_topic', callback) # 訂閱 consumer.start() # 開啟 while True: time.sleep(3600) consumer.shutdown() # 關閉