C#基於Modbus三種CRC16校驗方法的效能對比
阿新 • • 發佈:2020-11-12
-
rabbitmq安裝
-
使用docker搜尋、拉取映象、執行為容器
docker search rabbitmq docker pull rabbitmq 若不指定版本,預設拉取最新的版本 docker run -d --name rabbit -p 5672:5672 -p 15672:15672 --hostname my-rabbit -e RABBITMQ_DEFAULT_USER=root -e RABBITMQ_DEFAULT_PASS=123 458123c67b79 最後為rabbit的映象ID
-
-
RabbitMQ概述
- 有一些業務邏輯是不能立刻完成,會阻塞程式,類似於傳送簡訊、郵件,這些都需要伺服器給第三方發起請求,不能立刻得到結果。這就會造成使用者操作過程停滯,所以解決辦法就是將這些耗時操作,放到佇列中去非同步執行。
- 有三個重要的組成部分,生產者、佇列、消費者。生產者就是web後端程式,佇列使用rabbitmq,消費者就是向第三方發起請求。
-
RabbitMQ整體架構
- rabbitmq-server內部包括兩部分:
- 交換機 將生產者傳遞過來的訊息,根據自身的要求,轉發給佇列
- 佇列(先進先出) 佇列與消費者事先有協議,消費者會從固定的佇列中取得要執行的任務。
- rabbitmq-server內部包括兩部分:
-
生產者的實現
- 整個過程需要5步
import pika # 1. 獲得與rabbitmq代理連線物件 connection_host = '192.168.1.38' connection_credentials = pika.PlainCredentials('root', '123') connection = pika.BlockingConnection( pika.ConnectionParameters(host=connection_host, credentials=connection_credentials)) # 2. 通過連線物件,獲得channel(管道)物件,用於操作rabbitmq channel = connection.channel() # 3. 建立名字my_queue的訊息佇列,如果存在不建立 channel.queue_declare('my_queue') # 4. 傳送訊息到rabbitmq中名字為my_queue的佇列中 channel.basic_publish(exchange='', routing_key='my_queue', body='hello word') # 5. 關閉和rabbitmq代理的連線 connection.close() print('訊息傳送完畢!')
-
消費者的實現
- 最後一步開啟監聽後,佇列中一旦加入任務,就會被消費者取走,並執行處理函式。
import pika # 1. 獲取與rabbitmq的連線物件 connection_host = '192.168.1.38' connection_credentials = pika.PlainCredentials('root', '123') connection = pika.BlockingConnection(pika.ConnectionParameters( host=connection_host, credentials=connection_credentials)) # 2. 通過連線物件獲得channel物件,用於操作rabbitmq channel = connection.channel() # 3. 建立名字為my_queue的訊息佇列,如果不存在就建立 channel.queue_declare(queue='my_queue') # 4. 按照rabbitmq要求定義訊息處理函式 def callback(ch, method, properties, body): print('接收到的訊息是:', body) # 此刻接收到的訊息是二進位制格式 # 5. 關聯佇列,並設定佇列中的訊息處理函式 # channel.basic_consume(callback, queue='my_queue', no_ack=True) 以前版本的寫法,後改為下面方式 channel.basic_consume('my_queue', callback, False) # 6. 啟動並開始處理訊息,該程式會一直執行,進行監聽 channel.start_consuming()
-
任務佇列
task.py檔案中內容 import time def send_email(): print('開始傳送郵件') time.sleep(3) print('郵件傳送完畢') def send_message(): print('開始傳送簡訊') time.sleep(3) print('簡訊傳送完畢') consumer.py檔案中內容 task_list = { 'email': task.send_email, 'message': task.send_message } # 任務後不可以加(),否則會立即執行 # 4. 按照rabbitmq要求定義訊息處理函式 def callback(ch, method, properties, body): task_name = body.decode() # 判斷生產者傳送的訊息是否在消費者中註冊過,如果沒有註冊過就提示錯誤 if task_name not in task_list: print('error:{}任務沒有註冊'.format(task_name)) return task_list[task_name]() producter.py檔案中的內容 # 4. 傳送訊息到rabbitmq中名字為my_queue的佇列中 channel.basic_publish(exchange='', routing_key='work_queue', body='message') 只需要改變body中的內容,就可以生產出不同的任務到佇列中去
-
訊息確認機制
-
原因:會出現的意外情況:消費者取到任務以後,並未執行完成任務,就死了。
-
rabbitmq預設會在將訊息傳送給消費者以後,會將任務從佇列中刪掉。
-
使用訊息確認機制,若消費者意外死亡,則不能給佇列反饋,佇列就不會刪除被該消費者取走的任務。
-
實現方法:
def callback(ch, method, properties, body): task_name = body.decode() # 判斷生產者傳送的訊息是否在消費者中註冊過,如果沒有註冊過就提示錯誤 if task_name not in task_list: print('error:{}任務沒有註冊'.format(task_name)) return task_list[task_name]() # 訊息處理完成後,確認訊息 ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_consume('work_queue', callback, True)
-
-
迴圈排程機制
- 預設是迴圈排程機制,就是消費之輪流去佇列中取任務
-
公平排程機制---在consumer檔案中設定公平排程
-
訊息確認機制開啟
-
設定公平排程機制,消費者不確認,就不要再給該消費之任務了,因為他目前的耗時任務還在執行,可以把任務給其他已經確認了的消費者。
在消費者中設定公平排程機制 channel.basic_qos(prefetch_count=1)
-
-
佇列及其中的訊息持久化---在product檔案中設定佇列及訊息持久化
-
重啟rabbitmq伺服器會使佇列和任務消失
-
解決方法:
-
在product檔案中,生成佇列的程式碼中加引數
# 3. 建立名字my_queue的訊息佇列,如果存在不建立 channel.queue_declare('work_queue', durable=True) durable=True就可以保持,佇列的持久化 # 4. 傳送訊息到rabbitmq中名字為my_queue的佇列中 channel.basic_publish(exchange='', routing_key='work_queue', body='message', properties=pika.BasicProperties(delivery_mode=2)) # properties=pika.BasicProperties(delivery_mode=2)可以保持訊息持久化
-
-
-
交換機的三種模式
生產者 import pika # 1. 獲得與rabbitmq代理連線物件 connection_host = '192.168.1.38' connection_credentials = pika.PlainCredentials('root', '123') connection = pika.BlockingConnection( pika.ConnectionParameters(host=connection_host, credentials=connection_credentials)) # 2. 通過連線物件,獲得channel(管道)物件,用於操作rabbitmq channel = connection.channel() # 3. 建立名字my_queue的訊息佇列,如果存在不建立 channel.queue_declare('work_queue', durable=True) # 4. 傳送訊息到rabbitmq中名字為my_queue的佇列中 channel.basic_publish(exchange='', routing_key='work_queue', body='message', properties=pika.BasicProperties(delivery_mode=2)) # 只要生產者,傳送不同的body,就會消費者中的處理函式,就會呼叫不同的task # 5. 關閉和rabbitmq代理的連線 connection.close() print('訊息傳送完畢!') 消費者 import pika import task task_list = { 'email': task.send_email, 'message': task.send_message } # 任務後不可以加(),否則會立即執行 # 1. 獲取與rabbitmq的連線物件 connection_host = '192.168.1.38' connection_credentials = pika.PlainCredentials('root', '123') connection = pika.BlockingConnection(pika.ConnectionParameters( host=connection_host, credentials=connection_credentials)) # 2. 通過連線物件獲得channel物件,用於操作rabbitmq channel = connection.channel() # 3. 建立名字為my_queue的訊息佇列,如果不存在就建立 channel.queue_declare(queue='work_queue') # 4. 按照rabbitmq要求定義訊息處理函式 def callback(ch, method, properties, body): task_name = body.decode() # 判斷生產者傳送的訊息是否在消費者中註冊過,如果沒有註冊過就提示錯誤 if task_name not in task_list: print('error:{}任務沒有註冊'.format(task_name)) return task_list[task_name]() # 訊息處理完成後,確認訊息 ch.basic_ack(delivery_tag=method.delivery_tag) # 設定公平排程機制 channel.basic_qos(prefetch_count=1) # 5. 關聯佇列,並設定佇列中的訊息處理函式 # channel.basic_consume(callback, queue='my_queue', no_ack=True) 以前版本的寫法,後改為下面方式 channel.basic_consume('work_queue', callback, False) # 6. 啟動並開始處理訊息 channel.start_consuming() 任務 import time def send_email(): print('開始傳送郵件') time.sleep(1) print('郵件傳送完畢') def send_message(): print('開始傳送簡訊') time.sleep(1) print('簡訊傳送完畢')