1. 程式人生 > >dubbo整合netflix原生的hystrix框架

dubbo整合netflix原生的hystrix框架

版權宣告:Mr_Iron_Hand原創    https://blog.csdn.net/kailuan2zhong/article/details/81367106 一、需求描述 需要將一個現有專案新增熔斷功能,而這個專案的技術棧為spring、dubbo等,並且使用的dubbo-Main方式啟動。

二、存在問題 由於專案並未使用web容器、也未使用spring-boot。而目前能拿到的資料中,都是在spring-boot下進行hystrix整合的,不得不說spring-boot的開箱即用理念,整合起來真的非常方便。因此,基於當前結構無法快速整合,也就需要開始排坑。

三、解決思路 目前有幾個方向來解決問題: 1、當現實情況改造為spring-boot不是上策 2、改造為web容器啟動會引入tomcat等元件,運維不一定答應 3、想辦法保持dubbo的Main方式啟動、spring保留,找辦法。

四、開始排坑 前提條件

1、對Hystrix框架要熟悉使用 2、對dubbo框架有一定認知,熟悉Filter使用

四-1、dubbo服務整合Hystrix 找思路

根據3中的方向,需要準備: 1、檢視spring-cloud-starter-hystrix中的依賴,決定使用netflix的原生hystrix元件,spring-cloud能打包,我也可以。 2、引入了hystrix-core和hystrix-javanica

測試思路(Hystrix是否生效)

1、註解方式配置@HystrixCommand等,測試介面,發現異常情況下沒有進入降級流程。 2、應該是dubbo框架對內部異常進行了封裝,導致異常丟擲後直接進入了dubbo處理流程 3、定義一個Filter,攔截dubbo的請求,對response進行分析,拿到success/exception,根據這裡的結構,進入降級流程。 4、根據3的思路,需要實現HystrixCommand介面,進而在Filter中進行整合。

定義Filter

import com.alibaba.dubbo.common.extension.Activate;
import com.alibaba.dubbo.rpc.*;
 
@Activate(group = "consumer")
public class HystrixFilter implements Filter {
    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
 
        DubboHystrixCommand command = new DubboHystrixCommand(invoker, invocation, "aaa");
        Result res = command.execute();
        return res;
    }
}

實現HystrixCommand

import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.utils.StringUtils;
import com.alibaba.dubbo.rpc.Invocation;
import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.Result;
import com.alibaba.dubbo.rpc.RpcResult;
import com.netflix.hystrix.*;
import com.netflix.hystrix.exception.HystrixRuntimeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
 
import java.util.Map;
 
 
public class DubboHystrixCommand extends HystrixCommand<Result> {
 
    private static Logger logger = LoggerFactory.getLogger(DubboHystrixCommand.class);
    private static final int DEFAULT_THREADPOOL_CORE_SIZE = 30;
    private Invoker invoker;
    private Invocation invocation;
    private String fallbackName;
 
    public DubboHystrixCommand(Invoker<?> invoker, Invocation invocation, String fallbackName) {
        super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey(invoker.getInterface().getName()))
                .andCommandKey(HystrixCommandKey.Factory.asKey(String.format("%s_%d", invocation.getMethodName(), invocation.getArguments() == null ? 0 : invocation.getArguments().length)))
                .andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
                        .withCircuitBreakerRequestVolumeThreshold(20)//10秒鐘內至少19此請求失敗,熔斷器才發揮起作用
                        .withCircuitBreakerSleepWindowInMilliseconds(3000)//熔斷器中斷請求30秒後會進入半開啟狀態,放部分流量過去重試
                        .withCircuitBreakerErrorThresholdPercentage(50)//錯誤率達到50開啟熔斷保護
                        .withExecutionTimeoutEnabled(false)//使用dubbo的超時,禁用這裡的超時
                        )
                .andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter()
                        .withCoreSize(getThreadPoolCoreSize(invoker.getUrl()))));//執行緒池為30
 
        this.invoker = invoker;
        this.invocation = invocation;
        this.fallbackName = fallbackName;
    }
 
 
    /**
     * 獲取執行緒池大小
     * <dubbo:parameter key="ThreadPoolCoreSize" value="20" />
     *
     * @param url
     * @return
     */
    private static int getThreadPoolCoreSize(URL url) {
        if (url != null) {
            int size = url.getParameter("ThreadPoolCoreSize", DEFAULT_THREADPOOL_CORE_SIZE);
            if (logger.isDebugEnabled()) {
                logger.debug("ThreadPoolCoreSize:" + size);
            }
            return size;
        }
 
        return DEFAULT_THREADPOOL_CORE_SIZE;
 
    }
 
    @Override
    protected Result run() throws Exception {
        Result res = invoker.invoke(invocation);
        if (res.hasException()) {
            throw new HystrixRuntimeException(HystrixRuntimeException.FailureType.COMMAND_EXCEPTION,
                    DubboHystrixCommand.class,
                    res.getException().getMessage(),
                    res.getException(), null);
        }
        return res;
    }
 
    @Override
    protected Result getFallback() {
        if (StringUtils.isEmpty(fallbackName)) {
            //丟擲原本的異常
            return super.getFallback();
        }
        return new RpcResult("the dubbo fallback.");
    }
}

註冊、使用Filter

hystrixFilter=*****.HystrixFilter

測試結果:可以正常進入降級流程,並且實現熔斷功能(熔斷、半開、恢復)

<dubbo:service interface="*****.service.HelloService" ref="helloService"  timeout="*****" filter="hystrixFilter">
    <dubbo:method name="say" retries="0" />
    <dubbo:parameter key="ThreadPoolCoreSize" value="30" />
</dubbo:service>