RabbitMQ中的Exchange Types
整理自:
- https://www.rabbitmq.com
- https://www.cloudamqp.com/blog/2015-09-03-part4-rabbitmq-for-beginners-exchanges-routing-keys-bindings.html
- 《RabbitMQ實戰》
在RabbitMQ中, 生產者把訊息傳送到一個交換機(exchange)上,而不是直接釋出到佇列上。交換機是一個訊息路由代理,它負責把訊息路由到不同的佇列中去(通過header 屬性,bindings,routing key)。
繫結(binding)是佇列和交換機的一個連結。
路由鍵(routing key)是一個訊息屬性,交換機在決定把這條訊息路由到佇列中時,也許會根據這個key來做判斷。(這取決於交換機的型別)
標準的RabbitMQ訊息流:
- 生產者把訊息釋出到交換機上
- 交換機收到訊息,負責把訊息路由
- 一個佇列和交換機的繫結被建立,交換機把訊息路由到佇列中
- 佇列中的訊息被消費者處理
交換機、連線(connections)和佇列在建立時可以被一些引數來配置,比如durable, temporary, 和 auto delete。Durable的交換機在伺服器重啟時會存活下來,直到被明確的刪除。Temporary的交換器在RabbitMQ關閉的時候就沒了。Auto Deleted的交換機在最後一個被繫結的物件解綁時就被刪去了。
在RabbitMQ中,有四種不同的交換機。
一、Direct Exchange
Direct交換機根據訊息的路由鍵把訊息傳遞給佇列。路由鍵是生產者加到訊息頭部的一個訊息屬性。路由鍵可以被視作交換機用來決定怎麼路由這個訊息的一個地址。訊息會進入到binding key完全符合訊息的路由鍵的佇列中。
在你想要用一個簡單的字串識別符號來區分被髮布到同一個交換機上的訊息時, direct交換機就有用了。想象佇列A(下圖中的create_pdf_queue)被binding key pdf_create繫結在direct交換機 (pdf_events) ,當一個路由鍵為pdf_create的訊息到達這個direct交換機時,交換機把它路由到binding_key = routing_key的佇列中去,在這裡,也就是佇列A。
(也可以多個佇列用同一個routing key來繫結,這樣的話訊息會被髮送到所有的這些佇列中)
例子:
路由鍵為pdf_log的訊息被髮送到了交換機pdf_events,訊息被路由到pdf_log_queue因為路由鍵pdf_log符合binding key pdf_log。
如果訊息的路由鍵不符合任何的binding key,那麼這個訊息將被丟棄。
預設交換機
預設交換機是一個預先定義的direct交換機,沒有名字,經常被空字串“”來引用。 當你使用預設交換機時,你的訊息會被路由到名字和訊息的路由鍵相等的佇列中。每個佇列都自動繫結到預設交換機,binding key就是佇列的名字。
預設交換機的程式碼示例:
producer.py:
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
routing_key='task_queue',
body=message,
properties=pika.BasicProperties(
delivery_mode = 2, # make message persistent
))
print(" [x] Sent %r" % message)
connection.close()
worker.py:
#!/usr/bin/env python
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
time.sleep(body.count(b'.'))
print(" [x] Done")
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
queue='task_queue')
channel.start_consuming()
二、Topic Exchange
Topic交換機根據路由鍵和routing pattern之間的萬用字元匹配來把訊息路由到一個或多個佇列中。routing pattern是queue binding所指定的(類似於上文的binding key)。 路由鍵必須是可以由“.”分割的一個字串,比如agreements.us和 agreements.eu.stockholm。routing pattern可能包含一個“*”來匹配路由鍵的某個位置。比如,"agreements.*.*.b.*"這個routing pattern將只會匹配第一個單詞為agreements以及第四個單詞為b的路由鍵。一個“#”匹配0個或多個單詞,比如"agreements.eu.berlin.#"這個routing pattern匹配任意以agreements.eu.berlin開頭的路由鍵。 消費者建立一個佇列,並用給定的routing pattern與交換機繫結。所有路由鍵符合routing pattern的訊息都會被路由到這個佇列中。 示例: 路由鍵為agreements.eu.berlin的訊息被髮送到了交換機agreements。 訊息被路由到佇列berlin_agreements因為routing pattern agreements.eu.berlin.# 與路由鍵匹配。訊息也被路由到了佇列all_agreements,因為路由鍵匹配routing pattern agreements.# 。示例程式碼: producer.py:
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs',
type='topic')
routing_key = sys.argv[1] if len(sys.argv) > 2 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
routing_key=routing_key,
body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()
consumer.py:
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs',
type='topic')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
binding_keys = sys.argv[1:]
if not binding_keys:
sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
sys.exit(1)
for binding_key in binding_keys:
channel.queue_bind(exchange='topic_logs',
queue=queue_name,
routing_key=binding_key)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body))
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()
三、Fanout Exchange
Fanout交換機把收到的訊息複製和路由到所有繫結它的佇列上,不管routing key/pattern。提供的那些Key都會被忽視。 Fanout在一個訊息需要倍傳送到一個或多個佇列中,用不同的方法去處理時,是有用的。示例程式碼:
emit_log.py:
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs',
type='fanout')
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs',
routing_key='',
body=message)
print(" [x] Sent %r" % message)
connection.close()
receive_logs.py:
#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs',
type='fanout')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='logs',
queue=queue_name)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r" % body)
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()
四、Headers Exchange
Headers交換機基於包括headers和可選值的引數來做路由。Headers交換機和Topic交換機非常像,但是它是基於header的值而不是路由鍵來做路由的。一條訊息被認為是匹配的,如果它header的值與繫結中指定的值相同的話。
一個叫做”x-match“的特殊的引數說明是否所有的header都必須匹配或者只需要有一個, 它有兩種值, 預設為"all",表示所有的header鍵值對都必須匹配;而"any"表示至少一個header鍵值對需要匹配。Headers可以被int或者string組成。
- 交換機與佇列A的繫結的引數: format = pdf, type = report
- 交換機與佇列B的繫結的引數: format = pdf, type = log
- 交換機與佇列C的繫結的引數: format = zip, type = report
如果沒有任何匹配的佇列,訊息會被丟棄。RabbitMQ提供了一個AMQP的拓展,"Dead Letter Exchange", 可以捕獲到那些沒有被成功傳達的訊息。
示例:
emitter.py:
#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='testing',
type='headers')
try:
while True:
fields = {}
data = input('>')
if '=' in data:
key, val = data.split('=')
fields[key] = val
continue
channel.basic_publish(exchange = 'testing',
routing_key = '',
body = data,
properties = \
pika.BasicProperties(headers = fields))
print (' [x] Send {0} with headers: {1}'.format(data, fields))
except KeyboardInterrupt:
print ('Bye')
finally:
connection.close()
receiver.py:
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='testing',
type='headers')
result = channel.queue_declare(exclusive=True)
if not result:
print ('Queue didnt declare properly!')
sys.exit(1)
queue_name = result.method.queue
channel.queue_bind(exchange='testing',
queue = queue_name,
routing_key = '',
arguments = {'ham': 'good', 'x-match':'any'})
def callback(ch, method, properties, body):
print ("{headers}:{body}".format(headers = properties.headers,
body = body))
channel.basic_consume(callback,
queue = queue_name,
no_ack=True)
try:
channel.start_consuming()
except KeyboardInterrupt:
print ('Bye')
finally:
connection.close()