RabbitMQ基本示例
阿新 • • 發佈:2018-11-07
一、RabbitMQ簡介:
''' RabbitMQ就是訊息佇列 之前不是學了Queue了嗎,都是佇列還學RabbitMQ幹嘛? 乾的事情是一樣的 Python的Queue有兩個, 一個執行緒Queue生產者消費者模型,一個程序Queue用於父程序與子程序互動 兩個完全獨立的Python程式就不能互動了,或者兩臺機器之間的Queue,Java跟Python之間不能互動了 所以有了RabbitMQ QQ跟Word通訊: 1、用socket直接通訊 2、通過硬碟通訊 3、QQ通過socket發給中間商,中間商通過socket發給Word 第1個和第3個有啥區別呢? 第一種直接通訊比較複雜, 第二種中間商可以省去網路通訊維護的工作,而且可以實現三方或者更多方的通訊 這個中間商就叫RabbitMQ Python語言連線RabbitMQ的模組有: pika主流常用、 celery分散式訊息佇列'''
二、簡單的示例:
import pika # 相當於建立最基本的socket connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) # 宣告一個管道 channel = connection.channel() # 在管道里宣告一個佇列 channel.queue_declare(queue='q1') # 通過管道發訊息 channel.basic_publish(exchange='', routing_keyproducer生產者='q1', # 佇列名 body='Hello World') # 訊息內容 print("send: Hello World") connection.close()
import pika ''' 消費者可能在其他機器上跨機器是沒問題的 ''' # 建立連線. connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) # 建立管道 channel = connection.channel()consumer消費者# 宣告佇列,為什麼還要宣告佇列? # 如果是生產者先執行,可以不寫這個。 # 由於不確定生產者先執行還是消費者先執行,所以寫上這個,隨便哪個先執行都不會報錯。 channel.queue_declare(queue='q1') def callback(ch, method, properties, body): # ch:管道記憶體物件地址,method:略,properties:略 print("-->", ch, method, properties) print("received:{0}".format(body)) # 開始消費訊息 channel.basic_consume(callback, # 如果收到訊息就呼叫該函式處理訊息 queue='q1', no_ack=True) print("Waiting for messages...") # 開始收訊息了。啟動就一直執行,一直收下去,沒有就阻塞住 channel.start_consuming()
執行結果:先啟動消費者,再啟動生產者(多個生產者,一個消費者)
''' 消費者控制檯(啟動時): Waiting for messages... 生產者控制檯(第一次執行): send: Hello World 消費者控制檯(變化1): Waiting for messages... --> <BlockingChannel impl=<Channel number=1 OPEN.... received:b'Hello World' 生產者控制檯(第二次執行): send: Hello World 消費者控制檯(變化2): Waiting for messages... --> <BlockingChannel impl=<Channel number=1 OPEN.... received:b'Hello World' --> <BlockingChannel impl=<Channel number=1 OPEN.... received:b'Hello World' '''
三、訊息分發輪詢:
上面的示例第二種執行結果:先啟動消費者,再啟動生產者(多個生產者,多個消費者)
''' 消費者控制檯(啟動消費者1): Waiting for messages... 消費者控制檯(啟動消費者2): Waiting for messages... 消費者控制檯(啟動消費者3): Waiting for messages... 生產者控制檯(第一次執行): send: Hello World 消費者1控制檯(變化): Waiting for messages... --> <BlockingChannel impl=<Channel number=1 OPEN.... received:b'Hello World' 生產者控制檯(第二次執行): send: Hello World 消費者2控制檯(變化): Waiting for messages... --> <BlockingChannel impl=<Channel number=1 OPEN.... received:b'Hello World' 生產者控制檯(第三次執行): send: Hello World 消費者3控制檯(變化): Waiting for messages... --> <BlockingChannel impl=<Channel number=1 OPEN.... received:b'Hello World' 生產者控制檯(第四次執行): send: Hello World 消費者1控制檯(變化): Waiting for messages... --> <BlockingChannel impl=<Channel number=1 OPEN.... received:b'Hello World' --> <BlockingChannel impl=<Channel number=1 OPEN.... received:b'Hello World'
輪詢分發... '''