1. 程式人生 > 程式設計 >C#基於Modbus三種CRC16校驗方法的效能對比

C#基於Modbus三種CRC16校驗方法的效能對比

  1. rabbitmq安裝

    • 使用docker搜尋、拉取映象、執行為容器

      docker search rabbitmq
      docker pull rabbitmq   若不指定版本,預設拉取最新的版本
      docker run -d --name rabbit -p 5672:5672 -p 15672:15672 --hostname my-rabbit -e RABBITMQ_DEFAULT_USER=root -e RABBITMQ_DEFAULT_PASS=123 458123c67b79 最後為rabbit的映象ID
      
  2. RabbitMQ概述

    • 有一些業務邏輯是不能立刻完成,會阻塞程式,類似於傳送簡訊、郵件,這些都需要伺服器給第三方發起請求,不能立刻得到結果。這就會造成使用者操作過程停滯,所以解決辦法就是將這些耗時操作,放到佇列中去非同步執行。
    • 有三個重要的組成部分,生產者、佇列、消費者。生產者就是web後端程式,佇列使用rabbitmq,消費者就是向第三方發起請求。
  3. RabbitMQ整體架構

    • rabbitmq-server內部包括兩部分:
      • 交換機 將生產者傳遞過來的訊息,根據自身的要求,轉發給佇列
      • 佇列(先進先出) 佇列與消費者事先有協議,消費者會從固定的佇列中取得要執行的任務。
  4. 生產者的實現

    • 整個過程需要5步
    import pika
    
    # 1. 獲得與rabbitmq代理連線物件
    connection_host = '192.168.1.38'
    connection_credentials = pika.PlainCredentials('root', '123')
    connection = pika.BlockingConnection(
        pika.ConnectionParameters(host=connection_host, credentials=connection_credentials))
    
    # 2. 通過連線物件,獲得channel(管道)物件,用於操作rabbitmq
    channel = connection.channel()
    
    # 3. 建立名字my_queue的訊息佇列,如果存在不建立
    channel.queue_declare('my_queue')
    
    # 4. 傳送訊息到rabbitmq中名字為my_queue的佇列中
    channel.basic_publish(exchange='', routing_key='my_queue', body='hello word')
    
    # 5. 關閉和rabbitmq代理的連線
    connection.close()
    
    print('訊息傳送完畢!')
    
  5. 消費者的實現

    • 最後一步開啟監聽後,佇列中一旦加入任務,就會被消費者取走,並執行處理函式。
    import pika
    
    # 1. 獲取與rabbitmq的連線物件
    connection_host = '192.168.1.38'
    connection_credentials = pika.PlainCredentials('root', '123')
    connection = pika.BlockingConnection(pika.ConnectionParameters(
        host=connection_host, credentials=connection_credentials))
    
    # 2. 通過連線物件獲得channel物件,用於操作rabbitmq
    channel = connection.channel()
    
    # 3. 建立名字為my_queue的訊息佇列,如果不存在就建立
    channel.queue_declare(queue='my_queue')
    
    
    # 4. 按照rabbitmq要求定義訊息處理函式
    def callback(ch, method, properties, body):
        print('接收到的訊息是:', body)
        # 此刻接收到的訊息是二進位制格式
    
    # 5. 關聯佇列,並設定佇列中的訊息處理函式
    # channel.basic_consume(callback, queue='my_queue', no_ack=True)  以前版本的寫法,後改為下面方式
    channel.basic_consume('my_queue', callback, False)
    
    # 6. 啟動並開始處理訊息,該程式會一直執行,進行監聽
    channel.start_consuming()
    
    
  6. 任務佇列

    task.py檔案中內容
    
    import time
    
    def send_email():
        print('開始傳送郵件')
        time.sleep(3)
        print('郵件傳送完畢')
    
    
    def send_message():
        print('開始傳送簡訊')
        time.sleep(3)
        print('簡訊傳送完畢')
        
        
    consumer.py檔案中內容
    task_list = {
        'email': task.send_email,
        'message': task.send_message
    }
    # 任務後不可以加(),否則會立即執行
    # 4. 按照rabbitmq要求定義訊息處理函式
    def callback(ch, method, properties, body):
        task_name = body.decode()
        # 判斷生產者傳送的訊息是否在消費者中註冊過,如果沒有註冊過就提示錯誤
        if task_name not in task_list:
            print('error:{}任務沒有註冊'.format(task_name))
            return
        task_list[task_name]()
    
    producter.py檔案中的內容
    # 4. 傳送訊息到rabbitmq中名字為my_queue的佇列中
    channel.basic_publish(exchange='', routing_key='work_queue', body='message')
    只需要改變body中的內容,就可以生產出不同的任務到佇列中去
    
  7. 訊息確認機制

    1. 原因:會出現的意外情況:消費者取到任務以後,並未執行完成任務,就死了。

    2. rabbitmq預設會在將訊息傳送給消費者以後,會將任務從佇列中刪掉。

    3. 使用訊息確認機制,若消費者意外死亡,則不能給佇列反饋,佇列就不會刪除被該消費者取走的任務。

    4. 實現方法:

      def callback(ch, method, properties, body):
          task_name = body.decode()
          # 判斷生產者傳送的訊息是否在消費者中註冊過,如果沒有註冊過就提示錯誤
          if task_name not in task_list:
              print('error:{}任務沒有註冊'.format(task_name))
              return
          task_list[task_name]()
          # 訊息處理完成後,確認訊息
          ch.basic_ack(delivery_tag=method.delivery_tag)
          
         
      channel.basic_consume('work_queue', callback, True)
      
  8. 迴圈排程機制

    1. 預設是迴圈排程機制,就是消費之輪流去佇列中取任務
  9. 公平排程機制---在consumer檔案中設定公平排程

    1. 訊息確認機制開啟

    2. 設定公平排程機制,消費者不確認,就不要再給該消費之任務了,因為他目前的耗時任務還在執行,可以把任務給其他已經確認了的消費者。

      在消費者中設定公平排程機制
      channel.basic_qos(prefetch_count=1)
      
  10. 佇列及其中的訊息持久化---在product檔案中設定佇列及訊息持久化

    1. 重啟rabbitmq伺服器會使佇列和任務消失

    2. 解決方法:

      • 在product檔案中,生成佇列的程式碼中加引數

        # 3. 建立名字my_queue的訊息佇列,如果存在不建立
        channel.queue_declare('work_queue', durable=True)
        durable=True就可以保持,佇列的持久化
        
        # 4. 傳送訊息到rabbitmq中名字為my_queue的佇列中
        channel.basic_publish(exchange='', routing_key='work_queue', body='message', properties=pika.BasicProperties(delivery_mode=2))
        # properties=pika.BasicProperties(delivery_mode=2)可以保持訊息持久化
        
        
  11. 交換機的三種模式

    生產者
    import pika
    
    # 1. 獲得與rabbitmq代理連線物件
    connection_host = '192.168.1.38'
    connection_credentials = pika.PlainCredentials('root', '123')
    connection = pika.BlockingConnection(
        pika.ConnectionParameters(host=connection_host, credentials=connection_credentials))
    
    # 2. 通過連線物件,獲得channel(管道)物件,用於操作rabbitmq
    channel = connection.channel()
    
    # 3. 建立名字my_queue的訊息佇列,如果存在不建立
    channel.queue_declare('work_queue', durable=True)
    
    # 4. 傳送訊息到rabbitmq中名字為my_queue的佇列中
    channel.basic_publish(exchange='', routing_key='work_queue', body='message', properties=pika.BasicProperties(delivery_mode=2))
    # 只要生產者,傳送不同的body,就會消費者中的處理函式,就會呼叫不同的task
    
    # 5. 關閉和rabbitmq代理的連線
    connection.close()
    
    print('訊息傳送完畢!')
    
    
    
    消費者
    import pika
    import task
    
    task_list = {
        'email': task.send_email,
        'message': task.send_message
    }
    # 任務後不可以加(),否則會立即執行
    
    # 1. 獲取與rabbitmq的連線物件
    connection_host = '192.168.1.38'
    connection_credentials = pika.PlainCredentials('root', '123')
    connection = pika.BlockingConnection(pika.ConnectionParameters(
        host=connection_host, credentials=connection_credentials))
    
    # 2. 通過連線物件獲得channel物件,用於操作rabbitmq
    channel = connection.channel()
    
    # 3. 建立名字為my_queue的訊息佇列,如果不存在就建立
    channel.queue_declare(queue='work_queue')
    
    
    # 4. 按照rabbitmq要求定義訊息處理函式
    def callback(ch, method, properties, body):
        task_name = body.decode()
        # 判斷生產者傳送的訊息是否在消費者中註冊過,如果沒有註冊過就提示錯誤
        if task_name not in task_list:
            print('error:{}任務沒有註冊'.format(task_name))
            return
        task_list[task_name]()
        # 訊息處理完成後,確認訊息
        ch.basic_ack(delivery_tag=method.delivery_tag)
    
    # 設定公平排程機制
    channel.basic_qos(prefetch_count=1)
    
    # 5. 關聯佇列,並設定佇列中的訊息處理函式
    # channel.basic_consume(callback, queue='my_queue', no_ack=True)  以前版本的寫法,後改為下面方式
    channel.basic_consume('work_queue', callback, False)
    
    # 6. 啟動並開始處理訊息
    channel.start_consuming()
    
    
    任務
    import time
    
    def send_email():
        print('開始傳送郵件')
        time.sleep(1)
        print('郵件傳送完畢')
    
    
    def send_message():
        print('開始傳送簡訊')
        time.sleep(1)
        print('簡訊傳送完畢')