1. 程式人生 > >【EasyNetQ 教程】- 訂閱

【EasyNetQ 教程】- 訂閱

_id code rec actor 競爭 路由 easy get def

EasyNetQ訂閱者訂閱消息類型(消息類的.NET類型)。一旦通過調用Subscribe方法設置了對類型的訂閱,就會在RabbitMQ代理上創建一個持久隊列,並且該類型的任何消息都將被放置在隊列中。只要連接,RabbitMQ就會將任何消息從隊列發送給用戶。

要訂閱消息,我們需要為EasyNetQ提供在消息到達時執行的操作。我們通過傳遞訂閱委托來做到這一點:

bus.Subscribe<MyMessage>("my_subscription_id", msg => Console.WriteLine(msg.Text));

現在每次發布MyMessage實例時,EasyNetQ都會調用我們的委托並將消息的Text屬性打印到控制臺。

您傳遞給Subscribe的訂閱ID很重要。EasyNetQ將在RabbitMQ代理上為消息類型和訂閱ID的每個唯一組合創建一個唯一的隊列。

每次調用Subscribe都會創建一個新的隊列使用者。如果使用相同的消息類型和訂閱ID調用“訂閱”兩次,則將創建兩個使用相同隊列的消費者。然後RabbitMQ將依次循環連續消息給每個消費者。這非常適合擴展和工作共享。假設您已經創建了一個處理特定消息的服務,但它的工作負擔過重。只需啟動該服務的新實例(在同一臺機器上,或在不同的機器上),無需配置任何內容,您就可以自動擴展。

如果使用不同的訂閱ID(但具有相同的消息類型)兩次調用“訂閱”,則將創建兩個隊列,每個隊列都有自己的使用者。

給定類型的每條消息的副本將被路由到每個隊列,因此每個消費者將獲得所有消息(該類型的消息)。如果你有幾個不同的服務都關心相同的消息類型,這是很好的。

編寫訂閱回調委托時的註意事項

當從通過EasyNetQ訂閱的隊列接收消息時,它們被放置在內存中隊列中。單個線程位於一個循環中,從隊列中獲取消息並調用其Action代理。由於在單個線程上一次處理一個委托,因此應避免長時間運行的同步IO操作。盡快從代表處返回控制權。

使用SubscribeAsync

SubscribeAsync允許您的訂閱者委托立即返回任務,然後異步執行長時間運行的IO操作。完成長期訂閱後,只需完成任務即可。在下面的示例中,我們使用異步IO操作(DownloadStringTask)向Web服務發出請求。

任務完成後,我們在控制臺上寫一行。

bus.SubscribeAsync<MyMessage>("subscribe_async_test", message => 
    new WebClient().DownloadStringTask(new Uri("http://localhost:1338/?timeout=500"))
        .ContinueWith(task => 
            Console.WriteLine("Received: ‘{0}‘, Downloaded: ‘{1}‘", 
                message.Text, 
                task.Result)));

另一個示例將導致在出現錯誤時拋出異常,然後導致消息被置於缺省錯誤隊列中:

_bus.SubscribeAsync<MessageType>("Queue_Identifier",
            message => Task.Factory.StartNew(() =>
            {
                // Perform some actions here
                // If there is a exception it will result in a task complete but task faulted which
                // is dealt with below in the continuation
            }).ContinueWith(task =>
            {
                if (task.IsCompleted && !task.IsFaulted)
                {
                    // Everything worked out ok
                }
                else
                {                        
                    // Dont catch this, it is caught further up the heirarchy and results in being sent to the default error queue
                    // on the broker
                    throw new EasyNetQException("Message processing exception - look in the default error queue (broker)");
                }
            }));

取消訂閱

所有subscribe方法都返回一個ISubscriptionResult它包含描述底層IConsumerIExchangeIQueue使用的屬性,如果需要,可以使用高級API進一步操作這些屬性IAdvancedBus

您可以通過在ISubscriptionResult實例或其ConsumerCancellation屬性調用Dispose來隨時取消訂閱者

var subscriptionResult = bus.Subscribe<MyMessage>("sub_id", MyHandler);

...

subscriptionResult.Dispose();
// this is equivalent to subscriptionResult.ConsumerCancellation.Dispose();

這將阻止EasyNetQ從隊列中消耗並關閉消費者的頻道。

請註意,處理IBusIAdvancedBus實例也將取消所有使用者並關閉與RabbitMQ的連接。

難道不叫subscriptionResult.Dispose()一個消息處理程序內。這將在EasyNetQ確認消費者頻道上的消息subscriptionResult.Dispose()與關閉該頻道呼叫之間產生競爭條件由於EasyNetQ的內部架構,這些調用將在不同的線程上調用,並且時序不確定。

【EasyNetQ 教程】- 訂閱