rabbitMq超詳解
在此向前輩們致敬:http://blog.csdn.net/shatty/article/details/9529463
為什麽要學rabbitMQ
在此之前,我們想來進行一個概念區分
threading queue :只能用於線程之間的消息傳發
進程queue:可以用於進程(父進程與子進程或者同屬於同一父進程之間的子進程交互)之間的消息傳發
那麽不同的語言之間,不同的機器之間怎麽實現相互通信呢,這是一個問題吧
因此,我們的rabbitMq就起了很大的作用
接下來,我們對函數進行一一的相關介紹
connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘localhost‘))#固定格式,創建一個類似於socket連接,,因為是在本地進行,所以可以直接用localhost
如果與其他電腦連接
pika.BlockingConnection(pika.ConnectionParameters(‘127.0.0.1‘,5672,‘simple‘,credentials))這樣
我們來看一看關於這個函數的介紹
def get_connection_parameters (self ,host ,port ,vhost ,username ,password , heartbeat_interval ):“”“返回一個pika連接的連接參數。 :參數str主機:連接到的RabbitMQ主機 :param int port:連接的端口 :param str vhost:虛擬主機 :參數str用戶名:使用的用戶名 :參數str密碼:使用的密碼 :param int heartbeat_interval:AMQP心跳間隔 :rtype:pika。ConnectionParameters “””
第三步:channel = connection.channel() #在連接上創建一個頻道
channel = connection.channel() #進行一個管道的聲明
channel.queue_declare(queue=‘hello‘) #聲明一個隊列,進行消息的傳送#客戶端與服務端都需要這樣
#註意以上都是套路,固定格式
接下來就是消息的發送呢
channel.basic_publish(exchange=‘‘,routing_key=‘hello‘,#消息隊列的名字
body=‘Hello World!‘)#消息的內容
connection.close() #當生產者發送完消息後,可選擇關閉連接
我們再來看看消費者的寫法
import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(
‘localhost‘))
channel = connection.channel()
channel.queue_declare(queue=‘hello‘)#這個是防止不知道誰先運行而造成的錯誤
#上面的幾行代碼都是套路,服務端的時候已介紹,此處不做過解釋
def callback(ch, method, properties, body):print(" [x] Received %r" % body)
#下面代碼是對消息的處理
channel.basic_consume(callback,#一旦接收到消息,就調用callback函數queue=‘hello‘,
no_ack=True)
原理代碼如下:生產者:
import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(‘localhost‘))
channel = connection.channel()
# 聲明queue
channel.queue_declare(queue=‘hello‘)
# n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
channel.basic_publish(exchange=‘‘,
routing_key=‘hello‘,
body=‘Hello World!‘)
print(" [x] Sent ‘Hello World!‘")
connection.close()
消費者:
import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(
‘localhost‘))
channel = connection.channel()
channel.queue_declare(queue=‘hello‘)
def callback(ch, method, properties, body):
print(‘--->‘,ch,method,properties,body)
print(" [x] Received %r" % body)
channel.basic_consume(callback,
queue=‘hello‘,
no_ack=True)
print(‘ [*] Waiting for messages. To exit press CTRL+C‘)
channel.start_consuming()
最後結果為:
C:\Python\Python36\python.exe C:/Users/Administrator/PycharmProjects/untitled3/python/day9/消費者.py
[*] Waiting for messages. To exit press CTRL+C
---> <BlockingChannel impl=<Channel number=1 OPEN conn=<SelectConnection OPEN socket=(‘::1‘, 58661, 0, 0)->(‘::1‘, 5672, 0, 0) params=<ConnectionParameters host=localhost port=5672 virtual_host=/ ssl=False>>>> #ch是我們剛聲明的內存對象的地址<Basic.Deliver([‘consumer_tag=ctag1.3ee0d6275e9f43288f95fe2ba2c83e1a‘, ‘delivery_tag=1‘, ‘exchange=‘, ‘redelivered=False‘, ‘routing_key=hello‘])> #這個包含你要把消息發給哪個queue的信息<BasicProperties> b‘Hello World!‘
[x] Received b‘Hello World!‘
好了,我們可以同時開三個消費者,不斷地接收消息,
那麽生產者沒有收到接收消息的確認怎麽辦呢
消費者:
import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(
‘localhost‘))
channel = connection.channel()
channel.queue_declare(queue=‘hello‘)
def callback(ch, method, properties, body):
print(‘--->‘,ch,method,properties,body)
print(" [x] Received %r" % body)
channel.basic_consume(callback,
queue=‘hello‘,
#no_ack=True
)
print(‘ [*] Waiting for messages. To exit press CTRL+C‘)
channel.start_consuming()
生產者:
import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(‘localhost‘))
channel = connection.channel()
# 聲明queue
channel.queue_declare(queue=‘hello‘)
# n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
channel.basic_publish(exchange=‘‘,
routing_key=‘hello‘,
body=‘Hello World!‘)
print(" [x] Sent ‘Hello World!‘")
connection.close()
結果是發現,生產者發送給一個消費者的消息傳遞給生產者了
rabbitMq超詳解