1. 程式人生 > >(1)RabbitMQ簡介與安裝

(1)RabbitMQ簡介與安裝

1.RabbitMQ簡介

因為RabbitMQ是基於開源的AMQP協議來實現的,所以在瞭解MQ時候,首先我們來了解下AMQP協議。AMQP,即Advanced Message Queuing Protocol,一個提供統一訊息服務的應用層標準高階訊息佇列協議,是應用層協議的一個開放標準,為面向訊息的中介軟體設計。基於此協議的客戶端與訊息中介軟體可傳遞訊息,並不受客戶端或者中介軟體不同產品、不同的開發語言等條件的限制,也就是說訊息生產者無需知道消費者如何處理訊息結果,反之亦然,解耦了元件跟元件依賴。RabbitMQ伺服器端用Erlang語言編寫,同時也支援多種客戶端來開發跨語言訊息傳遞,例如:Java,.NET,PHP,Python,JavaScript,Ruby,Go等。RabbitMQ還支援多種訊息傳遞協議、訊息排隊、傳遞確認、佇列的靈活路由、多種交換型別。還支援分散式叢集以實現高可用性和吞吐量。適用於排隊演算法、秒殺活動、訊息分發、非同步處理、資料同步、處理耗時任務、CQRS等應用場景。還可以通過HTTP-API命令列工具和用於管理和監視RabbitMQ的UI。

2.RabbitMQ在CentOS 7安裝

因為我對Linux運維知識面比較薄弱,所以在Linux上部署RabbitMQ這塊暫時不想耗太多時間在這上面去(後續有時間再深入瞭解),這裡我完全是跟著園區Net大神曉晨大佬這篇文章(https://www.cnblogs.com/stulzq/p/7551819.html)去部署的。網上也有很多RabbitMQ在Linux部署文章參考,大家也可以自行度娘。

//安裝服務端erlang語言
rpm -Uvh https://www.rabbitmq.com/releases/erlang/erlang-19.0.4-1.el7.centos.x86_64.rpm
//安裝socat
yum install socat
//安裝服務端RabbitMQ
rpm -Uvh https://www.rabbitmq.com/releases/rabbitmq-server/v3.6.12/rabbitmq-server-3.6.12-1.el7.noarch.rpm

3.RabbitMQ服務端常用命令

//啟用Web管理平臺
rabbitmq-plugins enable rabbitmq_management
//開啟服務
systemctl start rabbitmq-server.service
//停止服務
systemctl stop rabbitmq-server.service
//檢視服務狀態
systemctl status rabbitmq-server.service
//檢視RabbitMQ狀態
rabbitmqctl status
//新增使用者賦予管理員許可權
rabbitmqctl  add_user  tom  12345
rabbitmqctl  set_user_tags  tom  administrator
//檢視使用者列表
rabbitmqctl list_users
//刪除使用者
rabbitmqctl delete_user username
//修改使用者密碼
rabbitmqctl oldPassword Username newPassword

4.訪問RabbitMQ Web管理平臺

當啟用RabbitMQ Web管理平臺,我們根據部署CentOS 7系統的IP在瀏覽器上開啟http://IP:15672,如果新增了使用者,一定要設定新增使用者的VirtualHost的許可權,不然客戶端呼叫RabbitMQ時候會報錯!具體處理方法如下截圖:
//未設定許可權時


點解設定許可權即可。
如果訪問顯示404,則是防火牆把通訊給過濾掉了,請執行命令把防火牆關閉掉再開啟,以下我列出所有CentOS 7關於防火牆命令:

//檢視防火狀態
systemctl status firewalld
//暫時關閉防火牆
systemctl stop firewalld
//永久關閉防火牆
systemctl disable firewalld
//重啟防火牆
systemctl enable firewalld
//永久關閉後重啟
chkconfig iptables on 

關閉防火牆之後,在瀏覽器上就會看到下面管理平臺介面:

5.NET Core使用RabbitMQ

通過nuget安裝:https://www.nuget.org/packages/RabbitMQ.Client/

5.1定義生產者

class Program
{
    static void Main(string[] args)
    {
        string queueName = "DirectExchangeQueueName";
        string routeKey = "DirectExchangeQueueName";
        //建立連線工廠
        var factory = new ConnectionFactory
        {
            UserName = "dengwu",//使用者名稱
            Password = "123456",//密碼
            HostName = "192.168.112.133",//rabbitmq ip
        };

        //建立連線
        var connection = factory.CreateConnection();
        //建立通道
        var channel = connection.CreateModel();
        //宣告一個佇列
        channel.QueueDeclare(queueName, false, false, false, null);

        Console.WriteLine("\nRabbitMQ連線成功,請輸入訊息,輸入exit退出!");

        string input;
        do
        {
            input = Console.ReadLine();

            var sendBytes = Encoding.UTF8.GetBytes(input);
            //釋出訊息
            channel.BasicPublish("", routeKey, null, sendBytes);

        } while (input.Trim().ToLower() != "exit");
        channel.Close();
        connection.Close();
    }
}

5.2定義消費者

class Program
{
    static void Main(string[] args)
    {
        string queueName = "DirectExchangeQueueName";
        //建立連線工廠
        var factory = new ConnectionFactory
        {
            UserName = "dengwu",//使用者名稱
            Password = "123456",//密碼
            HostName = "192.168.112.133",//rabbitmq ip
        };

        //建立連線
        var connection = factory.CreateConnection();
        //建立通道
        var channel = connection.CreateModel();
        //事件基本消費者
        var consumer = new EventingBasicConsumer(channel);

        //接收到訊息事件
        consumer.Received += (ch, ea) =>
        {
            var boby = ea.Body;
            var message = Encoding.UTF8.GetString(boby.ToArray());

            Console.WriteLine($"收到訊息: {message}");
            //確認該訊息已被消費
            channel.BasicAck(ea.DeliveryTag, false);
            //Console.WriteLine($"收到該訊息[{ea.DeliveryTag}] 延遲10s傳送回執");
            //Thread.Sleep(10000);
            //Console.WriteLine($"已傳送回執[{ea.DeliveryTag}]");
        };
        //啟動消費者 設定為手動應答訊息
        channel.BasicConsume(queueName, false, consumer);
        Console.WriteLine("消費者已啟動");
        Console.ReadKey();
        channel.Dispose();
        connection.Close();
    }
}

執行:

通過啟動一個生產者,一個消費者,我們可以看到,生產者通過RabbitMQ決定投遞訊息給對應消費者。

5.3RabbitMQ消費失敗的處理

RabbitMQ採用訊息應答機制,即消費者收到一個訊息之後,需要傳送一個應答,然後RabbitMQ才會將這個訊息從佇列中刪除,如果消費者在消費過程中出現異常,斷開連線沒有傳送應答,那麼RabbitMQ會將這個訊息重新投遞,下面我們將消費者接收到訊息事件程式碼修改如下:

//接收到訊息事件
consumer.Received += (ch, ea) =>
{
    var boby = ea.Body;
    var message = Encoding.UTF8.GetString(boby.ToArray());

    Console.WriteLine($"收到該訊息[{ea.DeliveryTag}] 延遲10s傳送回執");
    Thread.Sleep(10000);
    Console.WriteLine($"已傳送回執[{ea.DeliveryTag}]");
};

先在生產者裡面預先傳遞三個訊息:


如果我們設定了訊息應答延遲10s,如果在這10s中,該消費者斷開了連線,那麼訊息會被RabbitMQ重新投遞的。具體大家可以自行測試。

6.總結

該章節主要簡單介紹RabbitMQ概念在Linux上簡單部署,接下來章節,我會陸續介紹AMQP Messaging中的基本概念跟Exchange(交換機)。

參考文獻:
RabbitMQ官網
.NET Core 使用Rabb