1. 程式人生 > 其它 >一、RabbitMQ 概念詳解和應用

一、RabbitMQ 概念詳解和應用

訊息佇列和同步請求的區別

無論RabbitMQ還是Kafka,本質上都是提供了基於message或事件驅動非同步處理業務的能力,相比於http和rpc的直接呼叫,它有著不可替代的優勢:

1. 解耦,解耦的一個最常見做法就是在服務之間新增一層,使原來直接依賴的A,B service 鬆耦合,這在微服務架構中尤為重要。

2. 流量控制:通過訊息佇列意味著我們可以監控時間段內的需要處理的業務量,對於明顯超出服務承受能力的請求,我們可以延遲處理或者拒絕處理,保證服務的穩定性與可用性。

3. 負載均衡:可以通過訊息代理實現下游服務的負載均衡,有利於保證服務的高可用性;

訊息佇列協議介紹 AMQP

協議:AMQP,即Advanced Message Queuing Protocol,一個提供統一訊息服務的應用層標準高階訊息佇列協議,是應用層協議的一個開放標準,為面向訊息的中介軟體設計。

RabbitMQ模型:

1、Server:又稱 broker,接受客戶端連線,實現 AMQP 實體服務。
2、Connection:連線和具體 broker 網路連線。
3、Channel:網路通道,幾乎所有操作都在 channel 中進行,channel 是訊息讀寫的通道。客戶端可以建立多個 channel,每個 channel 表示一個會話任務。
4、message:訊息,伺服器和應用程式之間傳遞的資料,由 properties 和 body 組成。properties 可以對訊息進行修飾,比如訊息的優先順序,延遲等高階特性;body 是訊息實體內容。
5、Virtual host:虛擬主機,用於邏輯隔離,最上層訊息的路由。一個 Virtual host 可以若干個 Exchange 和 Queue,同一個 Virtual host 不能有同名的 Exchange 或 Queue。
6、Exchange:交換機,接受訊息,根據路由鍵轉發訊息到繫結的佇列上。
7、Binding:Exchange 和 Queue 之間的虛擬連線,binding 中可以包括 routing key。
8、Routing key:一個路由規則,虛擬機器根據他來確定如何路由 一條訊息。
9、Queue:訊息佇列,用來存放訊息的佇列。

Exchange 各種型別

交換機的型別,direct、topic、fanout、headers;

1、Direct Exchange,所有傳送到 Direct Exchange 的訊息被轉發到 RouteKey 中指定的 Queue, Direct Exchange 可以使用預設的預設的 Exchange (default Exchange),預設的 Exchange 會繫結所有的佇列,所以 Direct 可以直接使用 Queue 名(作為routing key )繫結。或者消費者和生產者的 routing key 完全匹配。

2、Toptic Exchange,是指傳送到 Topic Exchange 的訊息被轉發到所有關心的 Routing key 中指定 topic 的 Queue 上。Exchange 將 routing key 和某 Topic 進行模糊匹配,此時佇列需要繫結一個 Topic。所謂模糊匹配就是可以使用萬用字元,“#”可以匹配一個或多個詞,“*”只匹配一個詞。比如“log.#”可以匹配“log.info.test”, "log.*"就只能匹配 log.error。

3、Fanout Exchange:不處理路由鍵,只需簡單的將佇列繫結到交換機上。傳送到該交換機上的訊息都會被髮送到與該交換機繫結的佇列上。Fanout 轉發是最快的。

總而言之:Direct change是嚴格意義上的匹配,routing key 與 binding key 完全一樣;Fanout Exchange 是完全不關心你的routing key,向所有繫結的queue 全部發送;這是兩個極端,那麼Toptic Exchange 就相對Direct change寬鬆一些,它的訊息投遞取決於模糊匹配的結果。

最後介紹headers模式:

4. Headers Exchange:

headers型別的Exchange不依賴於routing key與binding key的匹配規則來路由訊息,而是根據傳送的訊息內容中的headers屬性進行匹配。
在繫結Queue與Exchange時指定一組鍵值對;當訊息傳送到Exchange時,RabbitMQ會取到該訊息的headers(也是一個鍵值對的形式),對比其中的鍵值對是否完全匹配Queue與Exchange繫結時指定的鍵值對;如果完全匹配則訊息會路由到該Queue,否則不會路由到該Queue。

通過圖來表示:

RabbitMQ 單機安裝與使用

windows 系統下,安裝教程傳送門(https://www.cnblogs.com/saryli/p/9729591.html)

linux 系統下,安裝教程傳送門(https://www.linuxprobe.com/install-rabbitmq-on-centos-7.html)

(會在後面的系列介紹高可用叢集的搭建)

這裡以dotnet 為例,示例如何使用 rabbitmq。

Producer End:

    public class MessageQueueService
    {
        private IConnection _connection;
        private IModel _channel;
        private const string TicketExchangeName = "tickets";
        public MessageQueueService()
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            _connection = factory.CreateConnection();
            _channel = GetChannel();
            _channel.ExchangeDeclare(TicketExchangeName, ExchangeType.Fanout, false, false);
        }

        public void SendTicketMessage(Ticket message, string optType)
        {
            
            var messageBytes = ObjectToByteArray(message);
            _channel.BasicPublish(TicketExchangeName, optType, false, null, messageBytes);
        }
     }

Consumer End:

    public class TicketConsumerService : IHostedService
    {
        private IConnection _connection;
        private IModel _channel;
        private const string TicketExchangeName = "tickets";
        private string queueName = "";
        public TicketConsumerService()
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            _connection = factory.CreateConnection();
            _channel = _connection.CreateModel();
            _channel.ExchangeDeclare(TicketExchangeName, ExchangeType.Fanout, false, false);
        }


        Task IHostedService.StartAsync(CancellationToken cancellationToken)
        {
            queueName =  _channel.QueueDeclare().QueueName;
            _channel.QueueBind(queueName, TicketExchangeName, "buy");
            var consumer = new EventingBasicConsumer(_channel);
            consumer.Received += (model, ea) =>
            {
                Ticket message = (Ticket)ByteArrayToObject(ea.Body.ToArray());
                var routingKey = ea.RoutingKey;
                Console.WriteLine(" [x] Received '{0}':'{1}'",
                    routingKey, message.Boarding);
            };
            _channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
            return Task.CompletedTask;
        }

        Task IHostedService.StopAsync(CancellationToken cancellationToken)
        {
            _connection.Dispose();
            _channel.Dispose();
            
            return Task.CompletedTask;
        }

        private Object ByteArrayToObject(byte[] arrBytes)
        {
            MemoryStream memStream = new MemoryStream();
            BinaryFormatter binForm = new BinaryFormatter();
            memStream.Write(arrBytes, 0, arrBytes.Length);
            memStream.Seek(0, SeekOrigin.Begin);
            Object obj = (Object)binForm.Deserialize(memStream);

            return obj;
        }
    }

這篇主要介紹了一些Message Queue的概念及如何在web 開發過程中使用。下一篇會介紹RabbitMQ的一些進階特性和使用場景。

--------------------------------------------

歡迎大家留言討論,指出錯誤和不當之處!