dubbo整合netflix原生的hystrix框架
版權宣告:Mr_Iron_Hand原創 https://blog.csdn.net/kailuan2zhong/article/details/81367106 一、需求描述 需要將一個現有專案新增熔斷功能,而這個專案的技術棧為spring、dubbo等,並且使用的dubbo-Main方式啟動。
二、存在問題 由於專案並未使用web容器、也未使用spring-boot。而目前能拿到的資料中,都是在spring-boot下進行hystrix整合的,不得不說spring-boot的開箱即用理念,整合起來真的非常方便。因此,基於當前結構無法快速整合,也就需要開始排坑。
三、解決思路 目前有幾個方向來解決問題: 1、當現實情況改造為spring-boot不是上策 2、改造為web容器啟動會引入tomcat等元件,運維不一定答應 3、想辦法保持dubbo的Main方式啟動、spring保留,找辦法。
四、開始排坑 前提條件
1、對Hystrix框架要熟悉使用 2、對dubbo框架有一定認知,熟悉Filter使用
四-1、dubbo服務整合Hystrix 找思路
根據3中的方向,需要準備: 1、檢視spring-cloud-starter-hystrix中的依賴,決定使用netflix的原生hystrix元件,spring-cloud能打包,我也可以。 2、引入了hystrix-core和hystrix-javanica
測試思路(Hystrix是否生效)
1、註解方式配置@HystrixCommand等,測試介面,發現異常情況下沒有進入降級流程。 2、應該是dubbo框架對內部異常進行了封裝,導致異常丟擲後直接進入了dubbo處理流程 3、定義一個Filter,攔截dubbo的請求,對response進行分析,拿到success/exception,根據這裡的結構,進入降級流程。 4、根據3的思路,需要實現HystrixCommand介面,進而在Filter中進行整合。
定義Filter
import com.alibaba.dubbo.common.extension.Activate; import com.alibaba.dubbo.rpc.*; @Activate(group = "consumer") public class HystrixFilter implements Filter { @Override public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { DubboHystrixCommand command = new DubboHystrixCommand(invoker, invocation, "aaa"); Result res = command.execute(); return res; } }
實現HystrixCommand
import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.utils.StringUtils;
import com.alibaba.dubbo.rpc.Invocation;
import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.Result;
import com.alibaba.dubbo.rpc.RpcResult;
import com.netflix.hystrix.*;
import com.netflix.hystrix.exception.HystrixRuntimeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
public class DubboHystrixCommand extends HystrixCommand<Result> {
private static Logger logger = LoggerFactory.getLogger(DubboHystrixCommand.class);
private static final int DEFAULT_THREADPOOL_CORE_SIZE = 30;
private Invoker invoker;
private Invocation invocation;
private String fallbackName;
public DubboHystrixCommand(Invoker<?> invoker, Invocation invocation, String fallbackName) {
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey(invoker.getInterface().getName()))
.andCommandKey(HystrixCommandKey.Factory.asKey(String.format("%s_%d", invocation.getMethodName(), invocation.getArguments() == null ? 0 : invocation.getArguments().length)))
.andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
.withCircuitBreakerRequestVolumeThreshold(20)//10秒鐘內至少19此請求失敗,熔斷器才發揮起作用
.withCircuitBreakerSleepWindowInMilliseconds(3000)//熔斷器中斷請求30秒後會進入半開啟狀態,放部分流量過去重試
.withCircuitBreakerErrorThresholdPercentage(50)//錯誤率達到50開啟熔斷保護
.withExecutionTimeoutEnabled(false)//使用dubbo的超時,禁用這裡的超時
)
.andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter()
.withCoreSize(getThreadPoolCoreSize(invoker.getUrl()))));//執行緒池為30
this.invoker = invoker;
this.invocation = invocation;
this.fallbackName = fallbackName;
}
/**
* 獲取執行緒池大小
* <dubbo:parameter key="ThreadPoolCoreSize" value="20" />
*
* @param url
* @return
*/
private static int getThreadPoolCoreSize(URL url) {
if (url != null) {
int size = url.getParameter("ThreadPoolCoreSize", DEFAULT_THREADPOOL_CORE_SIZE);
if (logger.isDebugEnabled()) {
logger.debug("ThreadPoolCoreSize:" + size);
}
return size;
}
return DEFAULT_THREADPOOL_CORE_SIZE;
}
@Override
protected Result run() throws Exception {
Result res = invoker.invoke(invocation);
if (res.hasException()) {
throw new HystrixRuntimeException(HystrixRuntimeException.FailureType.COMMAND_EXCEPTION,
DubboHystrixCommand.class,
res.getException().getMessage(),
res.getException(), null);
}
return res;
}
@Override
protected Result getFallback() {
if (StringUtils.isEmpty(fallbackName)) {
//丟擲原本的異常
return super.getFallback();
}
return new RpcResult("the dubbo fallback.");
}
}
註冊、使用Filter
hystrixFilter=*****.HystrixFilter
測試結果:可以正常進入降級流程,並且實現熔斷功能(熔斷、半開、恢復)
<dubbo:service interface="*****.service.HelloService" ref="helloService" timeout="*****" filter="hystrixFilter">
<dubbo:method name="say" retries="0" />
<dubbo:parameter key="ThreadPoolCoreSize" value="30" />
</dubbo:service>