搭建 RabbitMQ 叢集
阿新 • • 發佈:2020-11-24
RabbitMQ 搭建可以參考:https://www.cnblogs.com/klvchen/p/14029336.html
官網叢集介紹:https://www.rabbitmq.com/clustering.html
測試架構
名稱 | IP |
---|---|
node-1 | 192.168.0.100 |
node-2 | 192.168.0.101 |
# 在 node-1, node-2 上操作,加入 hosts vi /etc/hosts 192.168.0.100 node-1 192.168.0.101 node-2 # 在 node-2 上操作,關閉 rabbitmq 叢集 rabbitmqctl stop rabbitmqctl status # 在 node-1 上操作 cd cat .erlang.cookie # 在 node-2 上操作,需要與 node-1 的 .erlang.cookie 值相同 cd vi .erlang.cookie rabbitmq-server -detached # 停止節點 rabbit@node-2 rabbitmqctl stop_app # 重置節點 rabbit@node-2 rabbitmqctl reset # 建立節點 rabbit@node-2 with [rabbit@node-1] rabbitmqctl join_cluster rabbit@node-1 # 啟動節點 rabbit@node-2 rabbitmqctl start_app # 檢視叢集狀態 rabbitmqctl cluster_status # 映象模式,https://www.rabbitmq.com/rabbitmqctl.8.html#set_policy rabbitmqctl set_policy -p / ha "^" '{"ha-mode":"all","ha-sync-mode":"automatic"}' # 刪除主節點(node-1),在 node2 節點上操作 rabbitmqctl -n rabbit@node-2 forget_cluster_node rabbit@node-1
叢集名稱
叢集名稱預設是叢集中第一個節點的名稱,通過這個命令可以重新設定。
叢集名稱在客戶端連線時會通報給客戶端。Federation和Shovel外掛也會有用到叢集名稱的地方。
# 設定叢集名稱
rabbitmqctl set_cluster_name cluster_klvchen
映象模式
rabbitmqctl set_policy -p / ha "^" '{"ha-mode":"all","ha-sync-mode":"automatic"}'
刪除節點(node-1),在 node-2 節點上操作
rabbitmqctl -n rabbit@node-2 forget_cluster_node rabbit@node-1
使用 python 程式碼測試
建立 publish.py 傳送到 RabbitMQ
import pika # socket連線 credentials = pika.PlainCredentials('admin', '123456') connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.0.101', 28003, credentials=credentials)) my_queue = "task_mess" # 申明一個管道 channel = connection.channel() # 給管道里面申明一個queue channel.queue_declare(queue=my_queue, durable=True) # 通過管道傳送訊息 for l in range(0, 101): msg = "OderID: %d"%l channel.basic_publish(exchange='', routing_key=my_queue, body=msg) print("訊息以及傳送到subscribe端") connection.close()
建立 subscribe.py 傳送到 RabbitMQ
import pika
# socket連線
credentials = pika.PlainCredentials('admin', '123456')
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.0.101', 28003, credentials=credentials))
my_queue = "task_mess"
# 申明一個管道
channel = connection.channel()
# 給管道里面申明一個queue
channel.queue_declare(queue=my_queue, durable=True)
def callback(ch, method, properties, body):
print("subscribe端已經接收到訊息正在處理~~~", body)
# 管道接收訊息
channel.basic_consume(callback, queue=my_queue, no_ack=True)
channel.start_consuming()