1. 程式人生 > >RabbitMQ基本示例

RabbitMQ基本示例

一、RabbitMQ簡介:

'''
RabbitMQ就是訊息佇列
  之前不是學了Queue了嗎,都是佇列還學RabbitMQ幹嘛?
  乾的事情是一樣的
  Python的Queue有兩個,
     一個執行緒Queue生產者消費者模型,一個程序Queue用於父程序與子程序互動
     兩個完全獨立的Python程式就不能互動了,或者兩臺機器之間的Queue,Java跟Python之間不能互動了
     所以有了RabbitMQ
     
QQ跟Word通訊:
    1、用socket直接通訊
    2、通過硬碟通訊
    3、QQ通過socket發給中間商,中間商通過socket發給Word
    第1個和第3個有啥區別呢?
    第一種直接通訊比較複雜,
    第二種中間商可以省去網路通訊維護的工作,而且可以實現三方或者更多方的通訊
    這個中間商就叫RabbitMQ
Python語言連線RabbitMQ的模組有:
    pika主流常用、 celery分散式訊息佇列
'''

二、簡單的示例:

import pika

# 相當於建立最基本的socket
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
# 宣告一個管道
channel = connection.channel()
# 在管道里宣告一個佇列
channel.queue_declare(queue='q1')
# 通過管道發訊息
channel.basic_publish(exchange='',
                      routing_key
='q1', # 佇列名 body='Hello World') # 訊息內容 print("send: Hello World") connection.close()
producer生產者

 

import pika

'''
消費者可能在其他機器上跨機器是沒問題的
'''
# 建立連線.
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
# 建立管道
channel = connection.channel()
# 宣告佇列,為什麼還要宣告佇列? # 如果是生產者先執行,可以不寫這個。 # 由於不確定生產者先執行還是消費者先執行,所以寫上這個,隨便哪個先執行都不會報錯。 channel.queue_declare(queue='q1') def callback(ch, method, properties, body): # ch:管道記憶體物件地址,method:略,properties:略 print("-->", ch, method, properties) print("received:{0}".format(body)) # 開始消費訊息 channel.basic_consume(callback, # 如果收到訊息就呼叫該函式處理訊息 queue='q1', no_ack=True) print("Waiting for messages...") # 開始收訊息了。啟動就一直執行,一直收下去,沒有就阻塞住 channel.start_consuming()
consumer消費者

執行結果:先啟動消費者,再啟動生產者(多個生產者,一個消費者)

'''
消費者控制檯(啟動時):
  Waiting for messages...
生產者控制檯(第一次執行):
  send: Hello World
消費者控制檯(變化1):
  Waiting for messages...
  --> <BlockingChannel impl=<Channel number=1 OPEN....
  received:b'Hello World'
生產者控制檯(第二次執行):
  send: Hello World
消費者控制檯(變化2):
  Waiting for messages...
  --> <BlockingChannel impl=<Channel number=1 OPEN....
  received:b'Hello World'
  --> <BlockingChannel impl=<Channel number=1 OPEN....
  received:b'Hello World'
'''

 三、訊息分發輪詢:

上面的示例第二種執行結果:先啟動消費者,再啟動生產者(多個生產者,多個消費者)

'''
消費者控制檯(啟動消費者1):
  Waiting for messages...
消費者控制檯(啟動消費者2):
  Waiting for messages...
消費者控制檯(啟動消費者3):
  Waiting for messages...
生產者控制檯(第一次執行):
  send: Hello World
消費者1控制檯(變化):
  Waiting for messages...
  --> <BlockingChannel impl=<Channel number=1 OPEN....
  received:b'Hello World'
生產者控制檯(第二次執行):
  send: Hello World
消費者2控制檯(變化):
  Waiting for messages...
  --> <BlockingChannel impl=<Channel number=1 OPEN....
  received:b'Hello World'
生產者控制檯(第三次執行):
  send: Hello World
消費者3控制檯(變化):
  Waiting for messages...
  --> <BlockingChannel impl=<Channel number=1 OPEN....
  received:b'Hello World'
生產者控制檯(第四次執行):
  send: Hello World
消費者1控制檯(變化):
  Waiting for messages...
  --> <BlockingChannel impl=<Channel number=1 OPEN....
  received:b'Hello World'
  --> <BlockingChannel impl=<Channel number=1 OPEN....
  received:b'Hello World'
輪詢分發...
'''