1. 程式人生 > >python學習-rabbitMQ篇

python學習-rabbitMQ篇

while direct call 隊列 inf 其他 rac 解決 ctrl+c

一、簡介

 RabbitMQ,它是幹嘛用的吶?它是用來發消息的,消息隊列,那它跟我們之前的學習的python的線程queue和進程的queue有什麽區別呢?其實他們幹的事情都是一樣的。先來說說我們之前學習的python的queue吧。

  1. 線程queue:只是用於多個線程之間,進行數據同步交互的。
  2. 進程queue:只是用戶父進程與子進程進行交互,或者屬於同一父進程下的多個子進程進行交互。

  如果是兩個獨立的程序,即便是python 程序,兩個完全獨立的python程序也依然是不用這個python的這個線程或者進程queue來通信的。

  那麽問題來了,我現在兩個獨立的python程序,或者python跟Java程序,或者跟PHP程序,或者兩臺獨立機器之間的也涉及到生產者消費者模型,這個時候用python的線程queue和進程queue就通信不了了。那怎麽辦呢?這個時候我們只能搞一個中間代理,這個中間代理就是RabbitMQ。

二、消息發送方式如圖:

技術分享圖片

三、rabbitMQ安裝

windows下直接到官網下載即可 地址:http://www.rabbitmq.com/install-windows-manual.html

安裝完畢後在windows的服務中即出現了rabbitMQ的服務,如果沒有啟動建議設置為自動隨機啟動。

四、rabbitMQ工作原理如下:

技術分享圖片

五、rabbitMQ基本應用示例

1、 生產者(Producer)主要工作步驟如下:

建立socket->聲明管道->聲明queue->通過一個空的exchange發送內容至queue(不能直接發送到隊列)->關閉連接

import pika
 
#通過這個實例先建立一個socket
connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
#聲明一個管道
channel = connection.channel()
#聲明queue
channel.queue_declare(queue="basic_1") #這邊給queue起名字叫"basic_1"
#n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
channel.basic_publish(exchange="",
                      routing_key="basic_1",  #queue的名字
                      body="hello world")  #body是你發送的內容
print("[x] Sent ‘hello world‘")
#直接關閉連接
connection.close()

2、消費者(Consumer)主要工作步驟如下:

創建socket連接->創建管道->聲明queue->聲明回調函數callback ->消費的消息->開啟消費

## 消費者有可能在其他的機器上
import  pika
#建立一個socket連接
connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
#創建一個管道
channel = connection.channel()
#You may ask why we declare the queue again ? we have already declared it in our previous code.
# We could avoid that if we were sure that the queue already exists. For example if send.py program
#was run before. But we‘re not yet sure which program to run first. In such cases it‘s a good
# practice to repeat declaring the queue in both programs.
channel.queue_declare(queue="basic_1")
 
def callback(ch,method,properites,body):
    print("--->",ch,method,properites)
    print(" [x] Received %r" % body)

‘‘‘ ch:是send端管道的內存對象的地址
  method:指的send端的是發給誰,發給哪個Q的一些信息,一般不怎麽用
  properites:send端的屬性,這邊至的send端發過來給recive端的屬性
  body:是send端發過來的消息
‘‘‘

channel.basic_consume(#消費的消息
                      callback,  #如果收到消息,就調用callback函數來處理消息
                      queue="basic_1",#queue的名字
                      no_ack=True)
print(‘ [*] Waiting for messages. To exit press CTRL+C‘)
#這個start只要一啟動,就一直運行,它不止收一條,而是永遠收下去,沒有消息就在這邊卡住
channel.start_consuming()

總結: 

1、consumers中如果不聲明queue的話,則如果consumers先啟動,則會報錯。如果是producer先啟動,consumers後啟動則不報錯。但是如果說consumer聲明了,consumer先啟動就不會報錯。如果是producers先啟動的話,則忽略。
2、所有的socket傳輸都是bytes類型。
3、消費者和生產者不一定在同一臺機器上,在其他機器上運行也是可以的。
4、consumers啟動以後會一直運行下去,它會永遠的收下去。producers可以運行多次,只要運行一次,consumers就會接收一次。

六、消息分發輪詢

1、一個生產者對應多個消費者是采用輪詢機制,公平的依次發給每一個消費者,每個消費者消費1個。
2、一個生產者對應多個消費者,生產者發送多次消息,是采用輪詢的機制,公平的分給每一個消費者。
3、消費者代碼中no_ack=True,一般情況下是不加的,保證了連接斷開,消息就會轉給下一個消費者。當添加了之後,代表消費者拿到數據的時候,ranbbitMQ即刪除數據,如果此時消費者出現異常,則會產生數據丟失的情況(刪除數據需要在消費者的callback函數中添加下面一段:

channel.basic_ack(delivery_tag=method.delivery_tag)

)。
4、RabbitMQ判斷如果socket斷了,就知道連接斷了,消息就會轉給下一個消費者。
5、消費者的啟動順序,代表著是第幾個消費者。

七、RabbitMQ 消息持久化

1、隊列持久化 :此時當rabbitMQ重啟後,隊列存在,但隊列中數據消失

在生產者和消費者的隊列聲明時,均變為:

#聲明隊列queue
channel.queue_declare(queue=‘hello‘,durable=True)

2、消息持久化:此時當rabbitMQ重啟後,隊列存在,但隊列中數據仍存在,即消息持久化

首先需要隊列持久化,然後在生產端消息發布消息時修改為:

#發送數據
channel.basic_publish(exchange=‘‘,routing_key=‘hello5‘,
body=‘hello,you are welcomed!111222‘,
properties=pika.BasicProperties(
delivery_mode=2,
)
)
即可實現消息持久化,即當rabbitMQ重啟後,該消息未消費時,仍將繼續在隊列中。

八、消息公平分發

如果Rabbit只管按順序把消息發到各個消費者身上,不考慮消費者負載的話,很可能出現,一個機器配置不高的消費者那裏堆積了很多消息處理不完,同時配置高的消費者卻一直很輕松。為解決此問題,可以在各個消費者端,配置perfetch=1,意思就是告訴RabbitMQ在我這個消費者當前消息還沒處理完的時候就不要再給我發新消息了。

技術分享圖片

channel.basic_qos(prefetch_count=1)

  註意了,這種公平指的是你消費者有多大本事,就幹多少活,你消費者處理的越慢,我就分發的少,你消費者處理的越多,處理的快,我就多發點消息。我server端給客戶端發消息的時候,先檢查一下,你現在還有多少消息,你如果處理的消息超過1條,我就不給你發了,就是你當前消息沒有處理完畢,我就不給你發消息了,沒有消息,我就給你發。

九、exchange的類型(廣播模式)

之前的例子都基本都是隊列級別的 1對1的消息發送和接收,即消息只能發送到指定的queue裏,但有些時候你想讓你的消息被所有的Queue收到,類似廣播的效果,這時候就要用到exchange了,先來說說exchange的官方說明:

  An exchange is a very simple thing. On one side it receives messages from producers and the other side it pushes them to queues. The exchange must know exactly what to do with a message it receives. Should it be appended to a particular queue? Should it be appended to many queues? Or should it get discarded. The rules for that are defined by the exchange type.

  Exchange在定義的時候是有類型的,以決定到底是哪些Queue符合條件,可以接收消息

  1. fanout:所有bind到此exchange的queue都可以接收消息(純廣播的,所有消費者都能收到消息)
  2. direct:通過routingKey和exchange決定的那個唯一的queue可以接收消息
  3. topic:所有符合routingKey(此時可以是一個表達式)的routingKey所bind的queue可以接收消息
  4. headers:通過headers 來決定把消息發給哪些queue(這個很少用,一般情況下,我們用不到)
9.1 fanout廣播模式

說明:fanout這種模式是所有綁定exchange的queue都可以接收到消息。exchange=>轉換器

生產者(fanout_publiser)

說明:跟之前寫的不同,生產者這邊並沒有聲明queue,因為生產者是以廣播的形式,所以這邊不需要聲明queue

import pika
 
#創建socket連接
connection = pika.BlockingConnection(pika.ConnectionParameters
                                     (host=‘localhost‘))
#創建管道
channel = connection.channel()
 
#聲明exchange,且exchange的名字是logs,exchange的類型為fanout
channel.exchange_declare(exchange=‘logs‘,exchange_type="fanout")
 
#發送的消息
message = "info:hello world"
 
#生產一個消息
channel.basic_publish(
    exchange="logs",
    routing_key=‘‘,
    body=message
)
print("[X] Send {0}".format(message))
 
#關閉連接
connection.close()

消費者(fanout_consumer)

說明:消費者這邊要聲明一個唯一的queue_name的對象,並且從對象中獲取queue名

import pika
#創建一個socket
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host="localhost"))
#創建一個管道
channel = connection.channel()
#聲明exchange,exchange的名字logs,類型是fanout廣播模式
channel.exchange_declare(exchange="logs",
                         exchange_type="fanout")
#不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開後,自動將queue刪除,result是queue的對象
result = channel.queue_declare(exclusive=True) #exclusive=>排他的,唯一的
#獲取queue名
queue_name = result.method.queue
#綁定exchange
channel.queue_bind(exchange="logs",
                   queue=queue_name)
 
print(‘ [*] Waiting for logs. To exit press CTRL+C‘)
#聲明回調函數
def callback(ch,method,properties,body):
    "回調函數"
    print("[X] {0}".format(body))
#消費者消費
channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)
#啟動消費模式
channel.start_consuming()

  技術分享圖片

1、服務端沒有聲明queue,為什麽客戶端要聲明一個queue?
生產者發消息到exchange上,exchange就會遍歷一遍,所有綁定它的queue,然後把消息發到queue裏面,它發了queue就不管了,消費者從queue裏面去收,所以就收到廣播了,而不是說exchange直接就把消息發給消費者,消費者只會從queue裏去讀消息,且拿著queue去綁定exchange。
2、為什麽queue要自動生成,而不是自己手動去寫?
這個queue只是為了收廣播的,隊列是當消費者連接時自動生成的,每次生成的隊列不一樣,當消費者停止消費時,隊列自動銷毀

3、廣播實時性
  廣播是實時的,你不在的時候,就是你消費者沒有開啟的時候,發消息的時候,就沒有收到,這個時候就沒有了。如果消費者開啟了,生產者發消息時,消費者是收的到的,這個又叫訂閱發布,收音機模式

9.2 Direct廣播模式

隊列綁定關鍵字,發送者將數據根據關鍵字發送到消息exchange,exchange根據 關鍵字 判定應該將數據發送至指定隊列。

direct廣播模式邏輯圖:

技術分享圖片

生產者代碼:

import pika,sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters
                                     ("localhost"))
channel = connection.channel()
#定義direct類型的exchange
channel.exchange_declare(exchange="direct_logs",
                         exchange_type="direct")
#定義重要程度,定義什麽級別的日誌
severity = sys.argv[1] if len(sys.argv) > 1 else "info"
message = ‘ ‘.join(sys.argv[2:]) or "hello world"
#發送消息
channel.basic_publish(exchange="direct_logs",
                      routing_key=severity,
                      body=message
                      )
print(" [x] Sent %r:%r" % (severity, message))
connection.close()

消費者代碼:

import pika,sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters
                                     ("localhost"))
channel = connection.channel()
#定義direct類型的exchange
channel.exchange_declare(exchange="direct_logs",exchange_type="direct")
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
#手動輸入安全級別
severities = sys.argv[1:]
if not severities:
    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
    sys.exit(1)
#循環遍歷綁定消息隊列
for severity in severities:
    channel.queue_bind(exchange="direct_logs",
                       queue=queue_name,
                       routing_key=severity)
print(‘ [*] Waiting for logs. To exit press CTRL+C‘)
 
def callback(ch,method,properites,body):
    "回調函數"
    print(" [x] %r:%r" % (method.routing_key, body))
#消費消息
channel.basic_consume(callback,queue=queue_name,no_ack=True)
 
channel.start_consuming()

這種模式下,根據服務器執行程序時的參數的不一樣而消費不同廣播的數據。

消費者在啟動時需添加參數 info warning error 其中之一,即指定了消費的severity

則生產端啟動時默認不輸入時為info級別,否則按輸入的來,按照指定的routing_key發送數據。

9.3 topic細致的消息過濾模式

  在direct模式中我們做了一個區分,把error、warning綁定級別把消息區分了。我們回到日誌上,如果想做的更細致的區分,比如說,你現在搜索的有error,有warning等,在Linux上有一個系統日誌,這個系統日誌搜索所有應用的系統日誌。所有程序都在這個日誌裏面打日誌。那如果我想劃分出來。什麽是mysql的發出來的日誌,什麽是apache發出來的日誌。然後mysql日誌裏面同時是info,又包含warning,error。Apache也是一樣,所以我們要做更細的區分,更細致的消息過濾。

topic廣播模式邏輯圖:

技術分享圖片

代碼實現:

生產者:

import pika,sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters
                                     ("localhost"))
channel = connection.channel()
#聲明一個topic的exchange
channel.exchange_declare(exchange="topic_logs",
                         exchange_type="topic")
 
routing_key = sys.argv[1] if len(sys.argv) > 1 else "anonymous.info"
 
message = " ".join(sys.argv[2:]) or "hello world"
channel.basic_publish(exchange="topic_logs",
                      routing_key=routing_key,
                      body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()

 消費者:

import pika,sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
 
channel = connection.channel()
#聲明topic類型的exchange
channel.exchange_declare(exchange="topic_logs",
                         exchange_type="topic")
 
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
 
banding_keys = sys.argv[1:]
if not banding_keys:
    sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
    sys.exit(1)
#循環綁定queue
for banding_key in banding_keys:
    channel.queue_bind(exchange="topic_logs",
                       queue=queue_name,
                       routing_key=banding_key)
 
print(‘ [*] Waiting for logs. To exit press CTRL+C‘)
#回調函數
def callback(ch,method,properites,body):
    "回調函數"
    print(" [x] %r:%r" % (method.routing_key, body))
#消費者消費
channel.basic_consume(callback,queue=queue_name,no_ack=True)
 
channel.start_consuming()

  

生產端執行:
python topic_sender.py mysql.info system srated sucess!
python topic_sender.py app.error nullpiont error!

即在生產者的第一個參數裏寫明字段,
如應用 app.error app.info app.warning
mysql mysql.error mysql.info mysql.warning
等等

第二個參數寫各種情況的具體信息,如異常、信息和警告。

在消費時,按照下面規則進行匹配
To receive all the logs run: =>
# 是匹配所有的
python receive_logs_topic.py "#"

#只匹配app開頭的
python receive_logs_topic.py "app.*"

#只匹配error結尾的
python receive_logs_topic.py "*.error"

You can create multiple bindings:
#創建多個接收隊列
python receive_logs_topic.py "app.*" "*.error"
#只匹配一類消息
python receive_logs_topic.py "app.info"

9.4 rabbitMQ RPC實現

之前我們都是單向發送消息,客戶端發送消息給服務端,那麽問題來了,我現在發一個命令給遠程客戶端,讓它去執行,執行之後的結果,我想把這個結果再返回。這個模型叫什麽吶,這種模型叫RPC=>remote procedure call。

  怎麽返回這個消息呢?

  答:就server 端和客戶端既是消費者,又是生產者。

RPC模式邏輯圖:

技術分享圖片

代碼實現:

RPC ----CLIENT

import pika,uuid,time
 
class FibonacciRpcClient(object):
    "斐波那契數列rpc客戶端"
 
    def __init__(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters
                                                  (host="localhost"))
        self.channel = self.connection.channel()
        result = self.channel.queue_declare(exclusive=True)
        self.callback_queue = result.method.queue
        self.channel.basic_consume(self.on_response,no_ack=True,
                                   queue=self.callback_queue)
 
    def on_response(self,ch,method,props,body):
        print("---->",method,props)
        if self.corr_id == props.correlation_id: #我發過去的結果就是我想要的結果,保持數據的一致性
            self.response = body
 
    def call(self,n):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        self.channel.publish(exchange="",
                             routing_key="rpc_queue",
                             properties=pika.BasicProperties(
                                 reply_to=self.callback_queue,
                                 correlation_id=self.corr_id),
                             body=str(n))
        while self.response is None:
            self.connection.process_data_events() #非阻塞版的start_consumer()
            print("no msg....")
            time.sleep(0.5)
        return int(self.response)
 
if __name__ == "__main__":
    fibonacci_rpc = FibonacciRpcClient()
    print(" [x] Requesting fib(30)")
    response = fibonacci_rpc.call(30)
    print(" [.] Got %r" % response)

註:
1、想不阻塞,而是想每過一段時間,就過來檢查一下,就不能用start_consumer,而是用connection.process_data_evevts(),它是不阻塞的,如果收到消息就收到,收不到消息也返回,就繼續往下執行。
2、reply_to就是想讓服務器執行完命令之後,把結果返回到這個queue裏面。
3、在while self.respose is None中的代碼我可以不做time.sleep,我這邊可以發消息給服務器端,這個消息不一定按順序發給服務器端,如果不做self.corr_id == props.correlation_id的驗證,那數據就可能對不上了。

RPC ---- SERVER

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))
channel = connection.channel()
channel.queue_declare(queue="rpc_queue")


def fib(n):
    "斐波那契數列"
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n - 1) + fib(n - 2)


def on_request(ch, method, props, body):
    n = int(body)
    print(" [.] fib(%s)" % n)
    response = fib(n)
    ch.basic_publish(exchange="",
                     routing_key=props.reply_to,
                     properties=pika.BasicProperties(correlation_id=                                                          props.correlation_id),
                     # props的是客戶端的發過來的信息,這邊把correlation_id返回給客戶端做驗證
                     body=str(response))
    ch.basic_ack(delivery_tag=method.delivery_tag)


channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request, queue="rpc_queue")

print(" [x] Awaiting RPC requests")
channel.start_consuming()

  

註:props.reply_to,這個就是客戶端返回過來的queue。

如果客戶端和服務用的是同一個queue,客戶端也發到rpc_queue中,那麽客戶端就會收到自己的消息,就會形成一個死循壞,無法處理數據

python學習-rabbitMQ篇