1. 程式人生 > >Spring Cloud Ribbon 全解 (3)

Spring Cloud Ribbon 全解 (3)

上一篇我們瞭解到Ribbon主要由如下幾個元件組成:

  1. 所有Ribbon負載均衡器需要實現的介面IClient
  2. 服務例項列表維護機制實現的介面ServerList
  3. 負載均衡資料記錄LoadBalancerStats
  4. 負責選取Server的介面ILoadBalancer
  5. 負載均衡選取規則實現的介面IRule
  6. 檢查例項是否存活實現的介面IPing
  7. 服務例項列表更新機制實現的介面ServerListUpdater
  8. 服務例項列表過濾機制ServerListFilter

我們會逐個分析

1. 所有Ribbon負載均衡器需要實現的介面IClient

對於這個IClient,之前我們說到執行器邏輯,例如重試還有異常處理,都在這裡處理。我們看他的預設抽象類實現AbstractLoadBalancerAwareClient:

AbstractLoadBalancerAwareClient.java

public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
    //獲取重試處理器,這個由其他實現類動態實現
    RequestSpecificRetryHandler handler = getRequestSpecificRetryHandler(request, requestConfig);
    //構造LoadBalancerCommand,RxJava風格
LoadBalancerCommand<T> command = LoadBalancerCommand.<T>builder() .withLoadBalancerContext(this) .withRetryHandler(handler) .withLoadBalancerURI(request.getUri()) .build(); try { return command.submit( new ServerOperation<T>() { @Override
public Observable<T> call(Server server) { //修改原始url為實際的url URI finalUri = reconstructURIWithServer(server, request.getUri()); S requestForServer = (S) request.replaceUri(finalUri); try { //執行請求 return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig)); } catch (Exception e) { return Observable.error(e); } } }) .toBlocking() .single(); } catch (Exception e) { Throwable t = e.getCause(); if (t instanceof ClientException) { throw (ClientException) t; } else { throw new ClientException(e); } } } public abstract RequestSpecificRetryHandler getRequestSpecificRetryHandler(S request, IClientConfig requestConfig);

這個構造的LoadBalancerCommand是一個RxJava風格的,它包含了重試和異常處理機制:

LoadBalancerCommand.java

//返回一個只包含一個Server的Observable,但是每次從負載均衡器中獲取一個
private Observable<Server> selectServer() {
    return Observable.create(new OnSubscribe<Server>() {
        @Override
        public void call(Subscriber<? super Server> next) {
            try {
                Server server = loadBalancerContext.getServerFromLoadBalancer(loadBalancerURI, loadBalancerKey);   
                next.onNext(server);
                next.onCompleted();
            } catch (Exception e) {
                next.onError(e);
            }
        }
    });
}
public Observable<T> submit(final ServerOperation<T> operation) {
    final ExecutionInfoContext context = new ExecutionInfoContext();

    if (listenerInvoker != null) {
        try {
            listenerInvoker.onExecutionStart();
        } catch (AbortExecutionException e) {
            return Observable.error(e);
        }
    }

    //獲取在每個服務例項重試的的次數
    final int maxRetrysSame = retryHandler.getMaxRetriesOnSameServer();
    //最多嘗試幾個服務例項
    final int maxRetrysNext = retryHandler.getMaxRetriesOnNextServer();

    //對於每個服務例項的呼叫邏輯
    //預設field server是null,通過selectServer()方法獲取一個Server
    Observable<T> o = 
            (server == null ? selectServer() : Observable.just(server))
            .concatMap(new Func1<Server, Observable<T>>() {
                @Override
                //對於每個Server,按順序對映為對於每個Server包含重試邏輯的請求呼叫
                public Observable<T> call(Server server) {
                    //設定上下文
                    context.setServer(server);
                    final ServerStats stats = loadBalancerContext.getServerStats(server);

                    //每個Server包含重試邏輯的請求呼叫
                    Observable<T> o = Observable
                            .just(server)
                            .concatMap(new Func1<Server, Observable<T>>() {
                                @Override
                                public Observable<T> call(final Server server) {
                                    context.incAttemptCount();
                                    //增加Server正在處理的請求計數
                                    loadBalancerContext.noteOpenConnection(stats);

                                    //監聽器
                                    if (listenerInvoker != null) {
                                        try {
                                            listenerInvoker.onStartWithServer(context.toExecutionInfo());
                                        } catch (AbortExecutionException e) {
                                            return Observable.error(e);
                                        }
                                    }

                                    //計時器
                                    final Stopwatch tracer = loadBalancerContext.getExecuteTracer().start();
                                    //operation.call(server)就是剛剛分析的AbstractLoadBalancerAwareClient傳過來的ServerOperation,就是直接對這個Server呼叫請求
                                    //doOnEach的操作就是記錄請求前後的一些資料用於負載均衡資料統計
                                    return operation.call(server).doOnEach(new Observer<T>() {
                                        private T entity;
                                        @Override
                                        public void onCompleted() {
                                            //記錄請求完成
                                            recordStats(tracer, stats, entity, null);
                                        }

                                        @Override
                                        public void onError(Throwable e) {
                                            //記錄請求結束
                                            recordStats(tracer, stats, null, e);
                                            logger.debug("Got error {} when executed on server {}", e, server);
                                            //發生了錯誤,通知listener
                                            if (listenerInvoker != null) {
                                                listenerInvoker.onExceptionWithServer(e, context.toExecutionInfo());
                                            }
                                        }

                                        @Override
                                        public void onNext(T entity) {
                                            //因為只有呼叫請求成功只有一個結果(只有一個請求), 這裡的entity就是結果,只要收到結果就代表請求成功
                                            this.entity = entity;
                                            if (listenerInvoker != null) {
                                                listenerInvoker.onExecutionSuccess(entity, context.toExecutionInfo());
                                            }
                                        }                            

                                        private void recordStats(Stopwatch tracer, ServerStats stats, Object entity, Throwable exception) {
                                            tracer.stop();
                                            loadBalancerContext.noteRequestCompletion(stats, entity, exception, tracer.getDuration(TimeUnit.MILLISECONDS), retryHandler);
                                        }
                                    });
                                }
                            });

                    if (maxRetrysSame > 0)
                        //是否retry
                        o = o.retry(retryPolicy(maxRetrysSame, true));
                    return o;
                }
            });

    if (maxRetrysNext > 0 && server == null)
        //是否retry,如果retry回撥用selectServer()返回下一個Server
        o = o.retry(retryPolicy(maxRetrysNext, false));

    //異常處理
    return o.onErrorResumeNext(new Func1<Throwable, Observable<T>>() {
        @Override
        public Observable<T> call(Throwable e) {
            if (context.getAttemptCount() > 0) {
                //如果超過重試次數,則拋異常
                if (maxRetrysNext > 0 && context.getServerAttemptCount() == (maxRetrysNext + 1)) {
                    e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_NEXTSERVER_EXCEEDED,
                            "Number of retries on next server exceeded max " + maxRetrysNext
                            + " retries, while making a call for: " + context.getServer(), e);
                }
                else if (maxRetrysSame > 0 && context.getAttemptCount() == (maxRetrysSame + 1)) {
                    e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_EXEEDED,
                            "Number of retries exceeded max " + maxRetrysSame
                            + " retries, while making a call for: " + context.getServer(), e);
                }
            }
            if (listenerInvoker != null) {
                listenerInvoker.onExecutionFailed(e, context.toFinalExecutionInfo());
            }
            return Observable.error(e);
        }
    });
}

2. 服務例項列表維護機制實現的介面ServerList

image

AbstractServerList.java

其實這個抽象類一是在實現ServerList介面的同時,實現了IClientConfigAware這個介面,代表是可配置的。
同時,提供了一個生成預設ServerListFilter(這個Filter的實現類是由NIWSServerListFilterClassName這個配置決定,預設是ZoneAffinityServerListFilter)的方法

public abstract class AbstractServerList<T extends Server> implements ServerList<T>, IClientConfigAware {   

    public AbstractServerListFilter<T> getFilterImpl(IClientConfig niwsClientConfig) throws ClientException{
        try {
            String niwsServerListFilterClassName = niwsClientConfig
                    .getProperty(
                            CommonClientConfigKey.NIWSServerListFilterClassName,
                            ZoneAffinityServerListFilter.class.getName())
                    .toString();

            AbstractServerListFilter<T> abstractNIWSServerListFilter = 
                    (AbstractServerListFilter<T>) ClientFactory.instantiateInstanceWithClientConfig(niwsServerListFilterClassName, niwsClientConfig);
            return abstractNIWSServerListFilter;
        } catch (Throwable e) {
            throw new ClientException(
                    ClientException.ErrorType.CONFIGURATION,
                    "Unable to get an instance of CommonClientConfigKey.NIWSServerListFilterClassName. Configured class:"
                            + niwsClientConfig
                                    .getProperty(CommonClientConfigKey.NIWSServerListFilterClassName), e);
        }
    }
}

ConfigurationBasedServerList.java

這個是預設的實現,如果沒有特殊配置,ServerList的實現類就是ConfigurationBasedServerList;這個實際上就是從配置中讀取ServerList,這個配置可以是動態配置,例如是Archaius

public class ConfigurationBasedServerList extends AbstractServerList<Server>  {

    private IClientConfig clientConfig;

    @Override
    public List<Server> getInitialListOfServers() {
        return getUpdatedListOfServers();
    }

    @Override
    public List<Server> getUpdatedListOfServers() {
        String listOfServers = clientConfig.get(CommonClientConfigKey.ListOfServers);
        return derive(listOfServers);
    }

    @Override
    public void initWithNiwsConfig(IClientConfig clientConfig) {
        this.clientConfig = clientConfig;
    }
    //可以看出這個配置就是以逗號分隔的字串
    private List<Server> derive(String value) {
        List<Server> list = Lists.newArrayList();
        if (!Strings.isNullOrEmpty(value)) {
            for (String s: value.split(",")) {
                list.add(new Server(s.trim()));
            }
        }
        return list;
    }
}

DiscoveryEnabledNIWSServerList.java

這個就是從Eureka上面獲取Server列表的類,構造的時候需要傳入相關配置以及最重要的EurekaClient的Provider來獲取合適的EurekaClient以便於獲取Server列表。

實現ServerList介面的方法都是基於obtainServersViaDiscovery這個方法:

@Override
public List<DiscoveryEnabledServer> getInitialListOfServers(){
    return obtainServersViaDiscovery();
}

@Override
public List<DiscoveryEnabledServer> getUpdatedListOfServers(){
    return obtainServersViaDiscovery();
}

private List<DiscoveryEnabledServer> obtainServersViaDiscovery() {
    List<DiscoveryEnabledServer> serverList = new ArrayList<DiscoveryEnabledServer>();

    //如果EurekaClient沒有被初始化,則日誌報警並返回空的列表
    if (eurekaClientProvider == null || eurekaClientProvider.get() == null) {
        logger.warn("EurekaClient has not been initialized yet, returning an empty list");
        return new ArrayList<DiscoveryEnabledServer>();
    }
    EurekaClient eurekaClient = eurekaClientProvider.get();

    //這裡的vipAddresses其實就是微服務名稱的各種形式,但是注意,它們代表的是同一個微服務
    if (vipAddresses!=null){
        for (String vipAddress : vipAddresses.split(",")) {
            // if targetRegion is null, it will be interpreted as the same region of client
            List<InstanceInfo> listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, isSecure, targetRegion);
            for (InstanceInfo ii : listOfInstanceInfo) {
                if (ii.getStatus().equals(InstanceStatus.UP)) {

                    //是否覆蓋port
                    if(shouldUseOverridePort){
                        if(logger.isDebugEnabled()){
                            logger.debug("Overriding port on client name: " + clientName + " to " + overridePort);
                        }

                        //這裡複製一份是因為不希望其他的地方修改原有的例項資訊
                        InstanceInfo copy = new InstanceInfo(ii);

                        if(isSecure){
                            ii = new InstanceInfo.Builder(copy).setSecurePort(overridePort).build();
                        }else{
                            ii = new InstanceInfo.Builder(copy).setPort(overridePort).build();
                        }
                    }

                    DiscoveryEnabledServer des = new DiscoveryEnabledServer(ii, isSecure, shouldUseIpAddr);
                    des.setZone(DiscoveryClient.getZone(ii));
                    serverList.add(des);
                }
            }

            //如果有一個vipAddress有服務列表,我們就不用獲取剩餘的了
            if (serverList.size()>0 && prioritizeVipAddressBasedServers){
                break; 
            }
        }
    }
    return serverList;
}

到這裡我們可以看出,Ribbon和Eureka的配合其實就是Ribbon從Eureka中利用微服務名稱獲取Server列表;那麼這個列表是如何更新的呢,在Eureka的章節我們提到過,Ribbon定時從EurekaClient獲取服務例項列表更新,這就涉及到了下一個我們要講到的Ribbon元素 - 服務例項列表更新機制實現的介面ServerListUpdater