乘聯會祕書長崔東樹:2021 年 11 月中國佔世界新能源車 60% 份額
阿新 • • 發佈:2022-01-03
消費者
import pika # ########################## 消費者 ########################## # 建立連線 credentials = pika.PlainCredentials("guest", "guest") connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', port=5672, virtual_host='/', credentials=credentials)) # 開闢管道 channel = connection.channel() channel.queue_declare(queue='msg') def callback(ch, method, properties, body): print(" [x] 傳送訊息 %r" % body) # 告訴rabbitmq,用callback來接收訊息 channel.basic_consume('msg', callback, True) # #接收準備 # channelx.basic_consume(callback, # 收到訊息的回撥函式 # queue="dongchannel11", # 佇列名 # no_ack=True # 是否傳送訊息確認 # ) print(' [*] 等待訊息') # 開始接收資訊,並進入阻塞狀態,佇列裡有資訊才會呼叫callback進行處理 channel.start_consuming()
生產者
import pika import json import time # ######################### 生產者 ######################### # 建立連線 credentials = pika.PlainCredentials("guest", "guest") # 虛擬佇列需要指定引數 virtual_host,如果是預設的可以不填。 connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', port=5672, virtual_host='/', credentials=credentials)) # 開闢管道 channel = connection.channel() # 宣告訊息佇列,訊息將在這個佇列傳遞,如不存在,則建立 channel.queue_declare(queue='msg') # 傳送資料,傳送一條,如果要傳送多條則複製此段 # 向佇列插入數值 routing_key是佇列名 # channel.basic_publish(exchange='', # routing_key='hello', # 佇列名 # body='Hello World!') # 傳送的資料 # # print(" [生產] 傳送 'Hello World!'") # connection.close() # 傳送多條 def send(message): if not isinstance(message, dict): raise {"msg": "不是字典型別"} message = json.dumps(message) channel.basic_publish( exchange='', routing_key='msg', body=message ) connection.close() message = {'type': 'log', 'message': { 'table_name': "hr24_ee_laowang_log", 'log_data': { 'log_name': "api_request_log", 'name': "laowang", "age": 11 } }, 'create_time': int(time.time()) } send(message)