【EasyNetQ 教程】- 訂閱
EasyNetQ訂閱者訂閱消息類型(消息類的.NET類型)。一旦通過調用Subscribe方法設置了對類型的訂閱,就會在RabbitMQ代理上創建一個持久隊列,並且該類型的任何消息都將被放置在隊列中。只要連接,RabbitMQ就會將任何消息從隊列發送給用戶。
要訂閱消息,我們需要為EasyNetQ提供在消息到達時執行的操作。我們通過傳遞訂閱委托來做到這一點:
bus.Subscribe<MyMessage>("my_subscription_id", msg => Console.WriteLine(msg.Text));
現在每次發布MyMessage實例時,EasyNetQ都會調用我們的委托並將消息的Text屬性打印到控制臺。
您傳遞給Subscribe的訂閱ID很重要。EasyNetQ將在RabbitMQ代理上為消息類型和訂閱ID的每個唯一組合創建一個唯一的隊列。
每次調用Subscribe都會創建一個新的隊列使用者。如果使用相同的消息類型和訂閱ID調用“訂閱”兩次,則將創建兩個使用相同隊列的消費者。然後RabbitMQ將依次循環連續消息給每個消費者。這非常適合擴展和工作共享。假設您已經創建了一個處理特定消息的服務,但它的工作負擔過重。只需啟動該服務的新實例(在同一臺機器上,或在不同的機器上),無需配置任何內容,您就可以自動擴展。
如果使用不同的訂閱ID(但具有相同的消息類型)兩次調用“訂閱”,則將創建兩個隊列,每個隊列都有自己的使用者。
編寫訂閱回調委托時的註意事項
當從通過EasyNetQ訂閱的隊列接收消息時,它們被放置在內存中隊列中。單個線程位於一個循環中,從隊列中獲取消息並調用其Action代理。由於在單個線程上一次處理一個委托,因此應避免長時間運行的同步IO操作。盡快從代表處返回控制權。
使用SubscribeAsync
SubscribeAsync允許您的訂閱者委托立即返回任務,然後異步執行長時間運行的IO操作。完成長期訂閱後,只需完成任務即可。在下面的示例中,我們使用異步IO操作(DownloadStringTask)向Web服務發出請求。
bus.SubscribeAsync<MyMessage>("subscribe_async_test", message => new WebClient().DownloadStringTask(new Uri("http://localhost:1338/?timeout=500")) .ContinueWith(task => Console.WriteLine("Received: ‘{0}‘, Downloaded: ‘{1}‘", message.Text, task.Result)));
另一個示例將導致在出現錯誤時拋出異常,然後導致消息被置於缺省錯誤隊列中:
_bus.SubscribeAsync<MessageType>("Queue_Identifier", message => Task.Factory.StartNew(() => { // Perform some actions here // If there is a exception it will result in a task complete but task faulted which // is dealt with below in the continuation }).ContinueWith(task => { if (task.IsCompleted && !task.IsFaulted) { // Everything worked out ok } else { // Dont catch this, it is caught further up the heirarchy and results in being sent to the default error queue // on the broker throw new EasyNetQException("Message processing exception - look in the default error queue (broker)"); } }));
取消訂閱
所有subscribe方法都返回一個ISubscriptionResult。它包含描述底層IConsumerIExchange
和IQueue
使用的屬性,如果需要,可以使用高級API進一步操作這些屬性。IAdvancedBus
您可以通過在ISubscriptionResult
實例或其ConsumerCancellation
屬性上調用Dispose來隨時取消訂閱者:
var subscriptionResult = bus.Subscribe<MyMessage>("sub_id", MyHandler); ... subscriptionResult.Dispose(); // this is equivalent to subscriptionResult.ConsumerCancellation.Dispose();
這將阻止EasyNetQ從隊列中消耗並關閉消費者的頻道。
請註意,處理IBus
或IAdvancedBus
實例也將取消所有使用者並關閉與RabbitMQ的連接。
難道不叫subscriptionResult.Dispose()
一個消息處理程序內。這將在EasyNetQ確認消費者頻道上的消息subscriptionResult.Dispose()
與關閉該頻道的呼叫之間產生競爭條件。由於EasyNetQ的內部架構,這些調用將在不同的線程上調用,並且時序不確定。
【EasyNetQ 教程】- 訂閱