1. 程式人生 > >day11 rabbitmq redis rpc命令端

day11 rabbitmq redis rpc命令端

一、Rabbit MQ

1、工作佇列

工作佇列就是多個work共同按順序接收同一個queue裡面的任務,還可以設定basic_qos來確保當前的任務執行完畢後才繼續接收任務。

 

import pika

# 連線
conn = pika.BlockingConnection(pika.ConnectionParameters(host="192.168.120.71", port=5672))
channel = conn.channel()

# 申明佇列
channel.queue_declare(queue="work_queue", durable=True)     # durable 持久化,rabbit重啟這個queue也不會丟失

messages = ["apple", "pear", "cherry", "banana", "watermelon"]

for message in messages:
    # 傳送訊息,routing表示要傳送到那個queue,body就是傳送的訊息內容,properties是其他的一些配置,可以設定多個
    channel.basic_publish(exchange="", routing_key="work_queue", body=message, properties=pika.BasicProperties(
        delivery_mode=2     # 傳送的訊息持久化,前提是queue也是持久化到的
    ))
    print("send {message} ok".format(message=message))

# channel.queue_delete(queue="work_queue")    # 刪除queue
# 關閉連線
conn.close()

  

import pika
import time

# 連線
cred = pika.PlainCredentials("Glen", "Glen[1234]")  # 使用者名稱密碼等資訊
# conn = pika.BlockingConnection(pika.ConnectionParameters(host="192.168.120.71", port=5672))
conn = pika.BlockingConnection(pika.ConnectionParameters(host="192.168.120.71", port=5672, virtual_host="/", credentials=cred))
channel = conn.channel()

# 回撥函式
def callbak(ch, method, properties, body):
    print("body:", body)
    time.sleep(1)
    print("done..")
    print("method.delivery_tag", method.delivery_tag)
    ch.basic_ack(delivery_tag=method.delivery_tag)      # 這裡的功能和no_ack類似,突然終端queue會將任務繼續分配給下一個work

"""
使用basic_qos設定prefetch_count=1,使得rabbitmq不會在同一時間給工作者分配多個任務,
即只有工作者完成任務之後,才會再次接收到任務。
"""
channel.basic_qos(prefetch_count=1)
# channel.queue_declare(queue="work_queue")
channel.basic_consume(callbak, queue="work_queue", no_ack=False)    # no_ack 預設使False,需要等待callback執行完畢才算這個訊息處理完畢
channel.start_consuming()

"""
這裡多個work會按順序接收producer釋出的任務,處理完成後才繼續接收
"""

  

2、交換機  

producer先將訊息傳送到交換機exchange,然後exchange再將訊息傳送給所有幫繫結的queue,即將訊息廣播出去

 

import pika

conn = pika.BlockingConnection(pika.ConnectionParameters(host="192.168.120.71", port=5672))
channel = conn.channel()

# 定義交換機
"""
fanout: 所有bind到此exchange的queue都可以接收訊息
direct: 通過routingKey和exchange決定的那個唯一的queue可以接收訊息
topic:所有符合routingKey(此時可以是一個表示式)的routingKey所bind的queue可以接收訊息
"""
channel.exchange_declare(exchange="message", exchange_type="fanout")

while True:
    message = input(">>")
    # 直接傳送到exchange,接收端使用隨機的queue來繫結exchange,然後接收
    channel.basic_publish(exchange="message", routing_key="", body=message)
    print("send {message} ok".format(message=message))

  

import pika

conn = pika.BlockingConnection(pika.ConnectionParameters(host="192.168.120.71", port=5672))
channel = conn.channel()

# 定義交換機
channel.exchange_declare(exchange="message", exchange_type="fanout")

# 生成隨機的queue,並繫結到交換機
result = channel.queue_declare(exclusive=True)  # 不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開後,自動將queue刪除
queue_name = result.method.queue    # 獲取隨機勝場的queue名字

# 將隨機的queue繫結到exchange
channel.queue_bind(exchange="message", queue=queue_name)

def callback(ch, method, properties, body):
    print(body)

channel.basic_consume(callback, queue=queue_name, no_ack=True)

channel.start_consuming()

  

3、路由器

direct和路由器類似,傳送小時的時候需要指定目的地routing_key,只有對應的queue才會接收

import pika

conn = pika.BlockingConnection(pika.ConnectionParameters(host="192.168.120.71", port=5672))
channel = conn.channel()

# 定義路由鍵
"""
fanout: 所有bind到此exchange的queue都可以接收訊息
direct: 通過routingKey和exchange決定的那個唯一的queue可以接收訊息
topic:所有符合routingKey(此時可以是一個表示式)的routingKey所bind的queue可以接收訊息
"""
channel.exchange_declare(exchange="message2", exchange_type="direct")

while True:
    message, routing = input(">>").split()
    # 傳送訊息的時候同時指定routing_key,只有對應routing_key的consumer才會接收到
    # 傳送訊息示例:info_message info
    channel.basic_publish(exchange="message2", routing_key=routing, body=message)   # 傳送的每個訊息都要指明路由
    print("send {message} {routing} ok".format(message=message, routing=routing))

  

import pika

conn = pika.BlockingConnection(pika.ConnectionParameters(host="192.168.120.71", port=5672))
channel = conn.channel()

# 定義交換機
channel.exchange_declare(exchange="message2", exchange_type="direct")

# 生成隨機的queue,並繫結到交換機
result = channel.queue_declare(exclusive=True)  # 不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開後,自動將queue刪除
queue_name = result.method.queue    # 獲取隨機勝場的queue名字
# channel.queue_bind(exchange="message2", routing_key="info", queue=queue_name)
channel.queue_bind(exchange="message2", routing_key="warning", queue=queue_name)    # 繫結不同的routing_key
# channel.queue_bind(exchange="message2", routing_key="error", queue=queue_name)

def callback(ch, method, properties, body):
    print(body)

channel.basic_consume(callback, queue=queue_name, no_ack=True)

channel.start_consuming()

  

4、路由模糊匹配

producer傳送訊息的時候可以模糊地指定接收的queue,如有多個queue, mysql.error  redis.eror  mysql.info redis.info,指定不同的routing_key可以匹配到不同的queue,mysql.* 可以匹配到mysql.error,mysql.info, *.error可以匹配redis.error,mysql.error。“#”表示所有、全部的意思;“*”只匹配到一個詞。

import pika

conn = pika.BlockingConnection(pika.ConnectionParameters(host="192.168.120.71", port=5672))
channel = conn.channel()

# 定義路由鍵
"""
fanout: 所有bind到此exchange的queue都可以接收訊息
direct: 通過routingKey和exchange決定的那個唯一的queue可以接收訊息
topic:所有符合routingKey(此時可以是一個表示式)的routingKey所bind的queue可以接收訊息
"""
channel.exchange_declare(exchange="message3", exchange_type="topic")

"""
傳送的訊息如下:
a happy.work
b happy.life
c sad.work
d sad.life 
"""
while True:
    message, routing = input(">>").split()
    channel.basic_publish(exchange="message3", routing_key=routing, body=message)   # 傳送的每個訊息都要指明路由
    print("send {message} {routing} ok".format(message=message, routing=routing))

  

import pika

conn = pika.BlockingConnection(pika.ConnectionParameters(host="192.168.120.71", port=5672))
channel = conn.channel()

# 定義交換機
channel.exchange_declare(exchange="message3", exchange_type="topic")

# 生成隨機的queue,並繫結到交換機
result = channel.queue_declare(exclusive=True)  # 不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開後,自動將queue刪除
queue_name = result.method.queue    # 獲取隨機勝場的queue名字
# channel.queue_bind(exchange="message3", routing_key="#", queue=queue_name)    # 可以接收任何訊息
# channel.queue_bind(exchange="message3", routing_key="happy.*", queue=queue_name)    # 繫結不同的routing_key
channel.queue_bind(exchange="message3", routing_key="*.work", queue=queue_name)

def callback(ch, method, properties, body):
    print(body)

channel.basic_consume(callback, queue=queue_name, no_ack=True)

channel.start_consuming()

  

5、rpc遠端呼叫返回

遠端呼叫相當於有一個控制中心和多個計算節點,控制中心發指令呼叫遠端的計算節點的函式進行計算,然後將結果返回給計算中心,pika模組也實現了該功能

import pika
import time

# 建立連線
conn = pika.BlockingConnection(pika.ConnectionParameters(host="192.168.120.71", port=5672))
channel = conn.channel()

# 定義佇列
channel.queue_declare(queue="rpc_queue")

# 執行的函式
def mul(n):
    time.sleep(5)
    return n * n

# 定義接收到訊息的處理方法
def message_handle(ch, method, properties, body):
    print("{body} * {body} = ?".format(body=body))
    response = mul(int(body))
    # 將計算結果返回
    ch.basic_publish(exchange="", routing_key=properties.reply_to, body=str(response))
    # 返回執行成功
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(message_handle, queue="rpc_queue")
channel.start_consuming()

  

import pika
import threading


class Center(object):
    def __init__(self):
        self.response = ""
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(host="192.168.120.71"))
        self.channel = self.connection.channel()
        # 定義接收返回訊息的佇列 然後在傳送命令的時候作為引數傳遞過去,rpc執行完畢後將訊息傳送到這個queue裡面
        self.callback_queue = self.channel.queue_declare(exclusive=True).method.queue
        self.channel.basic_consume(self.response_hand, no_ack=True, queue=self.callback_queue)

    # 定義處理返回訊息的函式
    def response_hand(self, ch, method, properties, body):
        self.response = body
        print(body)

    def request(self, n):
        self.response = ""
        # 傳送計算請求,同時加上返回佇列名
        self.channel.basic_publish(body=str(n), exchange="", routing_key="rpc_queue", properties=pika.BasicProperties(
            reply_to=self.callback_queue
        ))
        # 等待接收返回資料
        while self.response is "":
            self.connection.process_data_events()
        return int(self.response)


while True:
    message = input(">>")
    if not message.isdigit():
        continue
    center = Center()
    t = threading.Thread(target=center.request, args=(int(message), ))      # 啟用多執行緒,可以不阻塞執行命令
    t.start()