1. 程式人生 > 其它 >python實現的訊息佇列RocketMQ客戶端使用

python實現的訊息佇列RocketMQ客戶端使用

rocketmq-python 是一個基於 rocketmq-client-cpp 封裝的 RocketMQ Python 客戶端。

一、Producer

#coding:utf-8
import
json from rocketmq.client import Producer, Message producer = Producer('PID-001') # 例項化Producer物件,指定group-id(可任意取名) producer.set_namesrv_addr('xxxxxx:xx') # rocketmq佇列介面地址(伺服器ip:port) producer.start() #
開啟 # 例項化訊息物件,需要指定應用名:topic_name msg = Message('your_topic_name') # 例項化訊息物件時,傳入topic名稱,一個應用盡可能用一個Topic # 指定訊息的keys msg.set_keys('your_keys') # 業務層面的唯一標識碼,方便將來定位訊息丟失問題。伺服器會為每個訊息建立索引(雜湊索引),應用可以通過topic,key來查詢這條訊息內容,以及訊息被誰消費。 # 指定訊息tags msg.set_tags('your_tags') # 訊息子型別用tags來標識,tags可以由應用自由設定。 #指定訊息體(內容)
msg_body = {'name':'laowang','age':28} body = json.dumps(msg_body).encode('utf-8') msg.set_body(body) # 傳入訊息體(json位元組串) # 向佇列傳送訊息 ret = producer.send_sync(msg) print(f'status:{ret.status}') # 0表示OK print(f'msg_id:{ret.msg_id}') # 訊息id,同消費者獲取到的訊息id print(f'offset:{ret.offset}') # 偏移量,預設從0開始,1,2。。。 producer.shutdown()
# 關閉

二、PullConsumer

# coding:utf-8

from rocketmq.client import  PullConsumer

consumer = PullConsumer('CID-001') # 指定group-id
consumer.set_namesrv_addr('xxxxxx:xx') # rocketmq佇列介面地址(伺服器ip:port)
consumer.start() # 開啟

# 可重複性消費
# 指定topic-name
for msg in consumer.pull('your_topic_name'):
    print(f'id:{msg.id}') # 訊息id
    print(f'topic:{msg.topic}') # 訊息topic_name
    print(f'tags:{msg.tags}') # 訊息tags
    print(f'keys:{msg.keys}') # 訊息Keys
    print(f'body:{msg.body}') # 訊息體
    print('-'*25+'分隔符'+'-'*25)
    
consumer.shutdown() # 關閉

三、PushConsumer

# coding:utf-8
import time

from rocketmq.client import PushConsumer

# 回撥函式,引數是訊息物件
def callback(msg):
    print(msg.id, msg.body)


consumer = PushConsumer('CID_XXX') # 指定group-id
consumer.set_namesrv_addr('127.0.0.1:9887') # rocketmq佇列介面地址(伺服器ip:port)
consumer.subscribe('Your_topic', callback) # 訂閱
consumer.start() # 開啟

while True:
    time.sleep(3600)

consumer.shutdown() # 關閉