1. 程式人生 > >EasyNetQ自定義異常訊息處理

EasyNetQ自定義異常訊息處理

  20140310補充:

  rabbitmq有requeue屬性,可以選擇訊息是否返回佇列,另,本文的解決方式非常之山寨,只能應用於傳送和接收方式。  

  這幾天在折騰訊息佇列,在.Net環境下有基於RabbitMQ有很多有API的選擇,最後選擇了比較簡單的EasyNetQ(http://easynetq.com/)

  在測試使用的時候發現一個問題,對於處理錯誤的訊息,EasyNetQ預設會放到一個錯誤佇列中並提供了一個工具可以對錯誤佇列的訊息進行重新分發。RabiitMQ是有一個事務功能的,但是貌似在EasyNetQ的實現中,沒有發現相關的操作方式

  現在的需求是,在某種特定的情況下,需要將處理不成功的訊息,保留在訊息原佇列

  做了一個變通處理,出現錯誤訊息的時候,將其再送回原佇列。從客戶端上可以直接send回佇列,但這樣不是一個很好的方式

  研究了一下EasyNetQ的相關程式碼,發現其定義了一個IConsumerErrorStrategy介面,我們只要自己自行實現,並註冊一個自定義方法就可以

  ComponentRegistration.cs原註冊方法相關,預設錯誤訊息處理方式在DefaultConsumerErrorStrategy.cs中

 1     public class ComponentRegistration
 2     {
 3         public static
void RegisterServices(IContainer container) 4 { 5 Preconditions.CheckNotNull(container, "container"); 6 7 // Note: IConnectionConfiguration gets registered when MQContext.CreateBus(..) is run. 8 #region DefaultConsumerErrorStrategy 9 container
10 .Register(_ => container) 11 .Register<IMessageQueueQLogger, ConsoleLogger>() 12 .Register<ISerializer, JsonSerializerN>() 13 .Register<IConventions, Conventions>() 14 .Register<IEventBus, EventBus>() 15 .Register<ITypeNameSerializer, TypeNameSerializer>() 16 .Register<Func<string>>(x => CorrelationIdGenerator.GetCorrelationId) 17 .Register<IClusterHostSelectionStrategy<ConnectionFactoryInfo>, DefaultClusterHostSelectionStrategy<ConnectionFactoryInfo>>() 18 .Register<IConsumerDispatcherFactory, ConsumerDispatcherFactory>() 19 .Register<IPublishExchangeDeclareStrategy, PublishExchangeDeclareStrategy>() 20 .Register<IPublisherConfirms, PublisherConfirms>() 21 .Register<IConsumerErrorStrategy, DefaultConsumerErrorStrategy>() 22 .Register<IHandlerRunner, HandlerRunner>() 23 .Register<IInternalConsumerFactory, InternalConsumerFactory>() 24 .Register<IConsumerFactory, ConsumerFactory>() 25 .Register<IConnectionFactory, ConnectionFactoryWrapper>() 26 .Register<IPersistentChannelFactory, PersistentChannelFactory>() 27 .Register<IClientCommandDispatcherFactory, ClientCommandDispatcherFactory>() 28 .Register<IHandlerCollectionFactory, HandlerCollectionFactory>() 29 .Register<IAdvancedBus, RabbitAdvancedBus>() 30 .Register<IRpc, Rpc>() 31 .Register<ISendReceive, SendReceive>() 32 .Register<IBus, RabbitBus>(); 33 #endregion 34 } 35 }
View Code

  對DefaultConsumerErrorStrategy的程式碼進行研究後發現,EasyNetQ是產生一個處理錯誤訊息佇列的Exchanges,將訊息二次封裝後推送到預設的錯誤佇列

  只要將訊息佇列改為源訊息佇列,取消訊息二次封裝,直接推送到佇列

  佇列繫結

 1         private string DeclareErrorExchangeAndBindToDefaultErrorQueue(IModel model, ConsumerExecutionContext context)
 2         {
 3             var originalRoutingKey = context.Info.RoutingKey;
 4 
 5             return errorExchanges.GetOrAdd(originalRoutingKey, _ =>
 6             {
 7                 var exchangeName = conventions.ErrorExchangeNamingConvention(context.Info);
 8                 model.ExchangeDeclare(exchangeName, ExchangeType.Direct, durable: true);
 9                 //更改第一個引數
10                 model.QueueBind(originalRoutingKey, exchangeName, originalRoutingKey);
11                 return exchangeName;
12             });
13         }
View Code

  訊息處理

 1         public virtual PostExceptionAckStrategy HandleConsumerError(ConsumerExecutionContext context, Exception exception)
 2         {
 3             Preconditions.CheckNotNull(context, "context");
 4             Preconditions.CheckNotNull(exception, "exception");
 5 
 6             try
 7             {
 8                 Connect();
 9 
10                 using (var model = connection.CreateModel())
11                 {
12                     var errorExchange = DeclareErrorExchangeQueueStructure(model, context);
13                     var messageBody = context.Body;
14                     var properties = model.CreateBasicProperties();
15                     context.Properties.CopyTo(properties);
16                     properties.Type = context.Properties.Type;
17                     //訊息持久化
18                     properties.SetPersistent(true);
19 
20                     model.BasicPublish(errorExchange, context.Info.RoutingKey, properties, messageBody);
21 
22                 }
23             }          
24             catch (Exception unexpectedException)
25             {
26                 // Something else unexpected has gone wrong :(
27                 logger.ErrorWrite("EasyNetQMessageQueue Consumer Error Handler: Failed to publish error message\nException is:\n"
28                     + unexpectedException);
29             }
30             return Consumer.PostExceptionAckStrategy.ShouldAck;
31         }
View Code

  因為EasyNetQ在CreateBus的時候就會將相關事件註冊,所以我們只需最後自行在config中加入有關配置,在註冊事件的位置加入判斷即可讓異常拋回

  由於剛接觸RabbitMQ,對於一些概念的理解可能還不是非常到位,這是我目前所能找到的解決方案

  當然可能以後的版本中,EasyNetQ會加入對事務的操作,現在EasyNetQ支援了一種叫Publisher Confirms的方法,貌似還是不是我想要的

相關推薦

EasyNetQ定義異常訊息處理

  20140310補充:   rabbitmq有requeue屬性,可以選擇訊息是否返回佇列,另,本文的解決方式非常之山寨,只能應用於傳送和接收方式。     這幾天在折騰訊息佇列,在.Net環境下有基於RabbitMQ有很多有API的選擇,最後選擇了比較簡單的EasyNetQ(http://eas

SpringBoot全域性異常捕獲及處理(包括定義異常捕獲處理

在做專案的時候需要對自定義異常做捕獲和處理,現在將程式碼記錄下來便於以後查閱。 1、全域性異常捕捉處理 @ControllerAdvice( annotations = {RestController.class} ) public class ExceptionHandlerAdv

定義異常處理

1.自定義專案異常 使用lombok自動生成無參和全部引數的構造方法,並生成對應欄位的get和set方法 @Data @AllArgsConstructor @NoArgsConstructor public class MyException extends Exception{

Springboot2:簡單的對定義異常進行處理

當我們需要對應不同的異常問題返回給前端不同的提示語時,我們通常會痛苦不堪,手忙腳亂。有的人甚至在裡層根據情況返回int狀態碼,再

@ControllerAdvice定義異常統一處理

正常來說一個系統肯定有很多業務異常。而這些業務異常的資訊如何返回給前臺呈現給使用者。比如使用者的某些操作不被允許,需要給使用者提示。 Spring 提供了@ControllerAdvice這個註解,這個註解可以實現全域性異常處理,全域性資料繫結,全域性資料預處理,這裡主要說下使用這個註解實現全域性異常處理。

10.異常處理定義異常、斷言

類型 出現 color spa 同時 就是 put exception valid 什麽是異常: 當程序遭遇某些非正常問題的時候就會拋出異常:比如int()只能處理能轉化成int的對象,如果傳入一個不能轉化的對象就會報錯並拋出異常 常用的異常有: ValueError :

約束、定義異常、加密、日誌處理

update () return 知識點 排查 file salt mat ror 一、約束   BaseMessage類用於約束,約束其派生類:保證派生類中必須編寫方法,不然執行就可能報錯。 class BaseMessage(object): def send

全局異常方式處理定義異常 @RestControllerAdvice + @ExceptionHandler

不可 end vat 信息 actor 頁面 ech opera 方式 前言 本文講解使用 @ControllerAdvice + @ExceptionHandler 進行全局的 Controller 層異常處理,可以處理大部分開發中用到的自自定義業務異常處理了,

Springboot定義異常處理

tro 定義 異常 handler .get png map spa sta 1.自定義異常類 import lombok.Data; @Data public class UserException extends RuntimeException { pr

Struts2定義攔截器處理全域性異常

  今天在整理之前的專案的時候想著有的action層沒有做異常處理,於是想著自定義攔截器處理一下未攔截的異常。   程式碼: package cn.xm.exam.action.safeHat; import java.util.HashMap; import java.util

SpringMVC之全域性異常處理 ——統一返回格式(定義異常

SpringMVC之全域性異常處理 老規矩開篇咱們先介紹一下背景 因當前APP越來越流行,或是提供的第三方介面等等都需要你來統一返回格式。這個時候問題就來了 ,很多時候系統的異常以及為了程式碼的可讀性我們必然會抽出很多的間接層(例如資料格式校驗、資料有效性校驗等),一層層的retur

Java IO流中的異常處理以及定義異常例項

文章目錄 異常 自定義異常例項 finally中特殊情況例項 異常 1、 Throwable類  a) 嚴重問題:Error,比如說記憶體不夠,一般程式中不進

springboot-統一返回資料,定義異常異常處理

springboot-統一返回資料,統一異常處理,自定義異常 程式碼下載: https://github.com/2010yhh/springBoot-demos.git 環境 idea2018,jdk1.8, springboot版本:1.5.9.RELEAS

定義異常處理類及錯誤提示類的使用技巧

一.關於自定義處理異常類及聯合自定義錯誤提示類的使用     1.專案內自定義py檔案,使用者存放自定義的錯誤提示類 class BaseResponse(object):     def __init__(self):

shiro與spring security如何用定義異常處理401

背景 現在是前後端分離的時代,後端必然要統一處理返回結果,比如定義一個返回物件 public class ResponseData<T> { /** * 統一返回碼 */ public String rtnCode; /*

定義異常異常全域性處理

一、Java異常分類 java中異常均繼承自Throwable,其有兩個重要的直接子類error與exception Error: 大部分是由虛擬機器報出來的錯誤,是程式無法處理的錯誤,如 OutOfMemoryError,當JVM需要更多記憶體空間而得不

springboot錯誤資訊返回與定義異常處理

在很多系統中,我們可能在執行時出現各種異常,有系統異常,程式碼異常,自定義的異常。這時候我們就要把這些錯誤異常資訊返回去給客戶端檢視才知道是哪裡出問題。這裡我們就說說spring boot的異常處理。   springboot錯誤資訊返回: 實際上,如果我們訪問一個url,不同

datatables定義異常處理

關閉alert提示,改為控制檯輸出錯誤 $.fn.dataTable.ext.errMode = 'none'; $('#example') .on( 'error.dt', function ( e, settings, techNote, message ) {

2.14 異常處理3:定義異常

自定義異常概述 系統提供的異常類error無所不包但控制的粒度很粗糙 如果想要進行很精細化的報錯和處理錯誤,就需要用到自定義異常 我們只要實現error介面中的Error() string方法,就可

SpringMVC中的異常處理(全域性異常處理定義異常類進行統一處理)

前言: WEB開發中,總會有一些不可預料的錯誤,對於一些課預測的異常,我們可以自定義一個異常類,然後再載入個全域性異常處理器,對系統中出現的異常進行統一的處理。 注意:當你在Springmvc配置檔案中配置全域性異常處理器的時候,只要如下配置即可: <!