1. 程式人生 > 實用技巧 >SpringCloud--Ribbon--原始碼解析--IloadBalancer&ServerListUpdater&ServerListFilter實現

SpringCloud--Ribbon--原始碼解析--IloadBalancer&ServerListUpdater&ServerListFilter實現

  從SpringCloud--Ribbon--原始碼解析--Ribbon入口實現可以看到Ribbon的總體流程,從總體流程可見,獲取server是個關鍵點

    protected Server getServer(ILoadBalancer loadBalancer, Object hint) {
        if (loadBalancer == null) {
            return null;
        }
        // Use 'default' on a null hint, or just pass it on?
        return loadBalancer.chooseServer(hint != null ? hint : "default");
    }

  可以看到,getServer方法最終呼叫的是ILoadBalancer介面的chooseServer方法,IloadBalancer提供方法如下所示:

public interface ILoadBalancer {
   //向負載均衡器中維護的例項列表中增加服務例項 public void addServers(List<Server> newServers);    //通過某種策略,負載均衡器維護的實力列表中獲取一個例項 public Server chooseServer(Object key); //標記服務不可用 public void markServerDown(Server server); @Deprecated public List<Server> getServerList(boolean availableOnly);   //獲取當前正常服務的列表 public List<Server> getReachableServers();   //獲取所有服務的列表 public List<Server> getAllServers(); }

  方法的描述已經在上述程式碼中做了註釋,通過檢視該介面的實現類,其類關係結構如下所示:

  通過配置類RibbonClientConfiguration,我們發現,ZoneAwareLoadBalancer是ILoadBalancer介面的預設實現類。

    @Bean
    @ConditionalOnMissingBean
    public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
            ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
            IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
        if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {
            return this.propertiesFactory.get(ILoadBalancer.class, config, name);
        }
        return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
                serverListFilter, serverListUpdater);
    }

  根據上述實現類,一個個做分析:

  1、AbstractLoadBalancer

  AbstractLoadBalancer類是ILoadBalancer介面的抽象實現類,在該抽象類中,定義了服務例項分組列舉類ServerGroup,包含三種不同型別(所有例項、正常服務例項、停止服務例項)

  另外還提供了三個抽象方法,獲取Server物件的chooseServer方法、根據服務列舉獲取服務例項集合的方法getServerList方法、獲取LoadBalancerStatus的getLoadBalancerStatus方法,其中LoadBalancerStatus物件主要用來儲存負載均衡器種各個服務例項當前的屬性和統計資訊。

public abstract class AbstractLoadBalancer implements ILoadBalancer {
    
    public enum ServerGroup{
        ALL,
        STATUS_UP,
        STATUS_NOT_UP        
    }
        
    public Server chooseServer() {
        return chooseServer(null);
    }

    public abstract List<Server> getServerList(ServerGroup serverGroup);

    public abstract LoadBalancerStats getLoadBalancerStats();    
}

  2、BaseLoadBalancer

  BaseLoadBalancer類是Ribbon負載均衡器的基礎實現類,在該類種,定義了許多關於負載均衡器的相關基礎內容。

  (1)定義並維護了兩個儲存服務例項Server物件的列表,一個用於儲存所有服務清單,一個用於儲存正常服務的清單。

  (2)定義了用來儲存負載均衡器各服務例項屬性和統計資訊的LoadBalanceStatus

  (3)定義了檢查服務例項是否正常服務的IPing物件

    protected IPing ping = null;

    @Monitor(name = PREFIX + "AllServerList", type = DataSourceType.INFORMATIONAL)
    protected volatile List<Server> allServerList = Collections
            .synchronizedList(new ArrayList<Server>());
    @Monitor(name = PREFIX + "UpServerList", type = DataSourceType.INFORMATIONAL)
    protected volatile List<Server> upServerList = Collections
            .synchronizedList(new ArrayList<Server>());

    protected LoadBalancerStats lbStats;


    @Override
    public List<Server> getReachableServers() {
        return Collections.unmodifiableList(upServerList);
    }

    @Override
    public List<Server> getAllServers() {
        return Collections.unmodifiableList(allServerList);
    }

  (4)定義了檢查服務例項操作的執行策咯物件IPingStrategy

    private final static SerialPingStrategy DEFAULT_PING_STRATEGY = new SerialPingStrategy();
    protected IRule rule = DEFAULT_RULE;

    protected IPingStrategy pingStrategy = DEFAULT_PING_STRATEGY;

        private static class SerialPingStrategy implements IPingStrategy {

        @Override
        public boolean[] pingServers(IPing ping, Server[] servers) {
            int numCandidates = servers.length;
            boolean[] results = new boolean[numCandidates];

            logger.debug("LoadBalancer:  PingTask executing [{}] servers configured", numCandidates);

            for (int i = 0; i < numCandidates; i++) {
                results[i] = false; /* Default answer is DEAD. */
                try {
                    // NOTE: IFF we were doing a real ping
                    // assuming we had a large set of servers (say 15)
                    // the logic below will run them serially
                    // hence taking 15 times the amount of time it takes
                    // to ping each server
                    // A better method would be to put this in an executor
                    // pool
                    // But, at the time of this writing, we dont REALLY
                    // use a Real Ping (its mostly in memory eureka call)
                    // hence we can afford to simplify this design and run
                    // this
                    // serially
                    if (ping != null) {
                        results[i] = ping.isAlive(servers[i]);
                    }
                } catch (Exception e) {
                    logger.error("Exception while pinging Server: '{}'", servers[i], e);
                }
            }
            return results;
        }
    }

  (5)定義了負載均衡處理的IRule物件,如果沒有指定,預設使用RoundRobinRule

    private final static IRule DEFAULT_RULE = new RoundRobinRule();
    
    protected IRule rule = DEFAULT_RULE;

  除此之外,在BaseLoadBalancer的建構函式種,還會啟動一個用於檢查Server是否健康的定時任務,該任務預設執行時間間隔為10秒

    public BaseLoadBalancer(String name, IRule rule, LoadBalancerStats stats,
            IPing ping, IPingStrategy pingStrategy) {
    
        logger.debug("LoadBalancer [{}]:  initialized", name);
        
        this.name = name;
        this.ping = ping;
        this.pingStrategy = pingStrategy;
        setRule(rule);
        setupPingTask();
        lbStats = stats;
        init();
    }

    void setupPingTask() {
        if (canSkipPing()) {
            return;
        }
        if (lbTimer != null) {
            lbTimer.cancel();
        }
        lbTimer = new ShutdownEnabledTimer("NFLoadBalancer-PingTimer-" + name,
                true);
        lbTimer.schedule(new PingTask(), 0, pingIntervalSeconds * 1000);
        forceQuickPing();
    }

  除此之外,BaseLoadBalancer還實現了ILoadBalancer介面定義的一系列方法

(1)新增一個Server

    @Override
    public void addServers(List<Server> newServers) {
        if (newServers != null && newServers.size() > 0) {
            try {
                ArrayList<Server> newList = new ArrayList<Server>();
                newList.addAll(allServerList);
                newList.addAll(newServers);
                setServersList(newList);
            } catch (Exception e) {
                logger.error("LoadBalancer [{}]: Exception while adding Servers", name, e);
            }
        }
    }

(2)選擇一個Server

  通過下述原始碼可以看到,最終呼叫的是IRule介面種的choose方法

    public Server chooseServer(Object key) {
        if (counter == null) {
            counter = createCounter();
        }
        counter.increment();
        if (rule == null) {
            return null;
        } else {
            try {
                return rule.choose(key);
            } catch (Exception e) {
                logger.warn("LoadBalancer [{}]:  Error choosing server for key {}", name, key, e);
                return null;
            }
        }
    }

(3)標記某個服務例項暫停服務

    public void markServerDown(Server server) {
        if (server == null || !server.isAlive()) {
            return;
        }

        logger.error("LoadBalancer [{}]:  markServerDown called on [{}]", name, server.getId());
        server.setAlive(false);
        // forceQuickPing();

        notifyServerStatusChangeListener(singleton(server));
    }

3、DynamicServerListLoadBalancer

  DynamicServerListLoadBalancer繼承於BaseLoadBalancer類,它是對於基礎負載均衡器的擴充套件,實現了服務例項清單在執行期的動態更新能力;同時,它還具備了對服務例項清單的過濾功能。

  通過以下原始碼可以看出,DynamicServerListLoadBalancer對於基礎負載均衡器BaseLoadBalancer多了三個內容:ServerList、ServerListFilter、ServerListUpdater

    volatile ServerList<T> serverListImpl;

    volatile ServerListFilter<T> filter;

    protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
        @Override
        public void doUpdate() {
            updateListOfServers();
        }
    };

    protected volatile ServerListUpdater serverListUpdater;

    public DynamicServerListLoadBalancer() {
        super();
    }

 (1)ServerList

  ServerList是一個介面,該介面種提供了兩個方法:獲取初始化服務例項清單方法getInitialListOfServers和獲取更新服務例項清單方法getUpdatedListOfServers

  檢視ServerList的實現類,可以得到如下的實現和繼承關係

  那麼,對於DynamicServerListLoadBalancer而言,使用的是哪一個呢,由於這兩個方法都需要獲取註冊中心的服務例項,因此Ribbon肯定由訪問Eureka的能力,那麼可以看一下Ribbon和Eureka整合的載入類org.springframework.cloud.netflix.ribbon.eureka.EurekaRibbonClientConfiguration,可以檢視ServerList的載入Bean

    @Bean
    @ConditionalOnMissingBean
    public ServerList<?> ribbonServerList(IClientConfig config,
            Provider<EurekaClient> eurekaClientProvider) {
        if (this.propertiesFactory.isSet(ServerList.class, serviceId)) {
            return this.propertiesFactory.get(ServerList.class, config, serviceId);
        }
        DiscoveryEnabledNIWSServerList discoveryServerList = new DiscoveryEnabledNIWSServerList(
                config, eurekaClientProvider);
        DomainExtractingServerList serverList = new DomainExtractingServerList(
                discoveryServerList, config, this.approximateZoneFromHostname);
        return serverList;
    }

可以看到,預設使用的是DomainExtractingServerList,那麼看下DomainExtractingServerList種對於ServerList種兩個方法的實現

/**
 * @author Dave Syer
 */
public class DomainExtractingServerList implements ServerList<DiscoveryEnabledServer> {

    private ServerList<DiscoveryEnabledServer> list;

    private final RibbonProperties ribbon;

    private boolean approximateZoneFromHostname;

    public DomainExtractingServerList(ServerList<DiscoveryEnabledServer> list,
            IClientConfig clientConfig, boolean approximateZoneFromHostname) {
        this.list = list;
        this.ribbon = RibbonProperties.from(clientConfig);
        this.approximateZoneFromHostname = approximateZoneFromHostname;
    }

    @Override
    public List<DiscoveryEnabledServer> getInitialListOfServers() {
        List<DiscoveryEnabledServer> servers = setZones(
                this.list.getInitialListOfServers());
        return servers;
    }

    @Override
    public List<DiscoveryEnabledServer> getUpdatedListOfServers() {
        List<DiscoveryEnabledServer> servers = setZones(
                this.list.getUpdatedListOfServers());
        return servers;
    }

    private List<DiscoveryEnabledServer> setZones(List<DiscoveryEnabledServer> servers) {
        List<DiscoveryEnabledServer> result = new ArrayList<>();
        boolean isSecure = this.ribbon.isSecure(true);
        boolean shouldUseIpAddr = this.ribbon.isUseIPAddrForServer();
        for (DiscoveryEnabledServer server : servers) {
            result.add(new DomainExtractingServer(server, isSecure, shouldUseIpAddr,
                    this.approximateZoneFromHostname));
        }
        return result;
    }

}

class DomainExtractingServer extends DiscoveryEnabledServer {

    private String id;

    @Override
    public String getId() {
        return id;
    }

    @Override
    public void setId(String id) {
        this.id = id;
    }

    DomainExtractingServer(DiscoveryEnabledServer server, boolean useSecurePort,
            boolean useIpAddr, boolean approximateZoneFromHostname) {
        // host and port are set in super()
        super(server.getInstanceInfo(), useSecurePort, useIpAddr);
        if (server.getInstanceInfo().getMetadata().containsKey("zone")) {
            setZone(server.getInstanceInfo().getMetadata().get("zone"));
        }
        else if (approximateZoneFromHostname) {
            setZone(ZoneUtils.extractApproximateZone(server.getHost()));
        }
        else {
            setZone(server.getZone());
        }
        setId(extractId(server));
        setAlive(server.isAlive());
        setReadyToServe(server.isReadyToServe());
    }

    private String extractId(Server server) {
        if (server instanceof DiscoveryEnabledServer) {
            DiscoveryEnabledServer enabled = (DiscoveryEnabledServer) server;
            InstanceInfo instance = enabled.getInstanceInfo();
            if (instance.getMetadata().containsKey("instanceId")) {
                return instance.getHostName() + ":"
                        + instance.getMetadata().get("instanceId");
            }
        }
        return super.getId();
    }

}

  這兩個方法都是呼叫ServerList的對應方法,然後再呼叫那個setZones方法,將返回的物件轉換成自定義的DomainExtractingServer物件(主要是新增了id、zone、isAliverFlag、readyToServer等屬性)。

  由於DomainExtractingServerList的建構函式傳入的是DiscoveryEnabledNIWSServerList,因此在呼叫ServerList的方法時,實際呼叫的是實現類DiscoveryEnabledNIWSServerList,最終呼叫DiscoveryEnabledNIWSServerList種的obtainServersViaDiscovery來處理,在obtainServersViaDiscovery種,主要依靠從Eureka中獲取的InstanceInfo物件的集合,迴圈集合,將InstanceInfo轉換成DiscoveryEnabledServer物件返回。

    @Override
    public List<DiscoveryEnabledServer> getInitialListOfServers(){
        return obtainServersViaDiscovery();
    }

    @Override
    public List<DiscoveryEnabledServer> getUpdatedListOfServers(){
        return obtainServersViaDiscovery();
    }

    private List<DiscoveryEnabledServer> obtainServersViaDiscovery() {
        List<DiscoveryEnabledServer> serverList = new ArrayList<DiscoveryEnabledServer>();

        if (eurekaClientProvider == null || eurekaClientProvider.get() == null) {
            logger.warn("EurekaClient has not been initialized yet, returning an empty list");
            return new ArrayList<DiscoveryEnabledServer>();
        }

        EurekaClient eurekaClient = eurekaClientProvider.get();
        if (vipAddresses!=null){
            for (String vipAddress : vipAddresses.split(",")) {
                // if targetRegion is null, it will be interpreted as the same region of client
                List<InstanceInfo> listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, isSecure, targetRegion);
                for (InstanceInfo ii : listOfInstanceInfo) {
                    if (ii.getStatus().equals(InstanceStatus.UP)) {

                        if(shouldUseOverridePort){
                            if(logger.isDebugEnabled()){
                                logger.debug("Overriding port on client name: " + clientName + " to " + overridePort);
                            }

                            // copy is necessary since the InstanceInfo builder just uses the original reference,
                            // and we don't want to corrupt the global eureka copy of the object which may be
                            // used by other clients in our system
                            InstanceInfo copy = new InstanceInfo(ii);

                            if(isSecure){
                                ii = new InstanceInfo.Builder(copy).setSecurePort(overridePort).build();
                            }else{
                                ii = new InstanceInfo.Builder(copy).setPort(overridePort).build();
                            }
                        }

                        DiscoveryEnabledServer des = createServer(ii, isSecure, shouldUseIpAddr);
                        serverList.add(des);
                    }
                }
                if (serverList.size()>0 && prioritizeVipAddressBasedServers){
                    break; // if the current vipAddress has servers, we dont use subsequent vipAddress based servers
                }
            }
        }
        return serverList;
    }

(2)ServerListUpdater

  通過上面的原始碼分析,我們知道了如何從Eureka註冊中心獲取服務清單,那麼是如何更新本地的Server列表的呢?在前面說到DynamicServerListLoadBalancer原始碼中可以看到如下程式碼

    volatile ServerList<T> serverListImpl;

    volatile ServerListFilter<T> filter;

    protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
        @Override
        public void doUpdate() {
            updateListOfServers();
        }
    };

    protected volatile ServerListUpdater serverListUpdater;

    @VisibleForTesting
    public void updateListOfServers() {
        List<T> servers = new ArrayList<T>();
        if (serverListImpl != null) {
            servers = serverListImpl.getUpdatedListOfServers();
            LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
                    getIdentifier(), servers);

            if (filter != null) {
                servers = filter.getFilteredListOfServers(servers);
                LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
                        getIdentifier(), servers);
            }
        }
        updateAllServerList(servers);
    }

    public void enableAndInitLearnNewServersFeature() {
        LOGGER.info("Using serverListUpdater {}", serverListUpdater.getClass().getSimpleName());
        serverListUpdater.start(updateAction);
    }

    private String getIdentifier() {
        return this.getClientConfig().getClientName();
    }

    public void stopServerListRefreshing() {
        if (serverListUpdater != null) {
            serverListUpdater.stop();
        }
    }

    @Monitor(name="LastUpdated", type=DataSourceType.INFORMATIONAL)
    public String getLastUpdate() {
        return serverListUpdater.getLastUpdate();
    }

    @Monitor(name="DurationSinceLastUpdateMs", type= DataSourceType.GAUGE)
    public long getDurationSinceLastUpdateMs() {
        return serverListUpdater.getDurationSinceLastUpdateMs();
    }

    @Monitor(name="NumUpdateCyclesMissed", type=DataSourceType.GAUGE)
    public int getNumberMissedCycles() {
        return serverListUpdater.getNumberMissedCycles();
    }

    @Monitor(name="NumThreads", type=DataSourceType.GAUGE)
    public int getCoreThreads() {
        return serverListUpdater.getCoreThreads();
    }

  從上述原始碼中不難看出,呼叫了ServerListUpdater的UpdateAction介面,其介面的實現是DynamicServerListLoadBalancer內部實現的方法updateListOfServers,從該方法中,可以看到,呼叫了ServerList介面的獲取變更服務列表的getUpdatedListOfServers方法,然後又呼叫了ServerListFilter介面的getFilteredListOfServers方法,獲取到最終的Server列表,最後呼叫updateAllServerList方法更新本地服務列表。

  可以看下ServerListUpdater介面的內容:

public interface ServerListUpdater {

    public interface UpdateAction {
        void doUpdate();
    }

    //啟動服務更新器,傳入的UpdateAction物件為更新操作的具體實現
    void start(UpdateAction updateAction);

    //停止服務更新器
    void stop();

    //獲取上一次更新的時間戳
    String getLastUpdate();

    //獲取上一次更新到當前時間的時間間隔,單為毫秒
    long getDurationSinceLastUpdateMs();

    //獲取錯過更新的週期數
    int getNumberMissedCycles();

    //獲取核心執行緒數
    int getCoreThreads();
}

  ServerListUpdater介面的實現類有兩個,EurekaNotificationServerListUpdater和PollingServerListUpdater,其中,PollingServerListUpdater更新器是ServerListUpdater介面的預設實現類,那麼就說明DynamicServerListLoadBalancer預設使用定時任務的方式進行服務列表的更新;EurekaNotificationServerListUpdater更新器也可以作為DynamicServerListLoadBalancer的更新器,其實現方式是需要利用Eureka的事件監聽器來驅動服務列表更新。

  可以看下預設更新器的start方法

    @Override
    public synchronized void start(final UpdateAction updateAction) {
        if (isActive.compareAndSet(false, true)) {
            final Runnable wrapperRunnable = new Runnable() {
                @Override
                public void run() {
                    if (!isActive.get()) {
                        if (scheduledFuture != null) {
                            scheduledFuture.cancel(true);
                        }
                        return;
                    }
                    try {
                        updateAction.doUpdate();
                        lastUpdated = System.currentTimeMillis();
                    } catch (Exception e) {
                        logger.warn("Failed one update cycle", e);
                    }
                }
            };

            scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
                    wrapperRunnable,
                    initialDelayMs,
                    refreshIntervalMs,
                    TimeUnit.MILLISECONDS
            );
        } else {
            logger.info("Already active, no-op");
        }
    }

  通過上述原始碼可以看出,建立了一個延時定時執行的執行緒池,具體執行方法是updateAction.doUpdate();,也就是在DynamicServerListLoadBalancer中實現的doUpdate方法,上面已經說過。同時通過原始碼可以看到建立執行緒池時,initialDelayMs預設取值為1000,refreshIntervalMs預設取直為30*1000,說明,更新例項在初始化後延遲一秒開始執行,並且以每30秒執行一次的週期執行。  

  (3)ServerListFilter

  在上一步中提到,現呼叫ServerListUpdater的getUpdatedListOfServers方法,然後又呼叫了ServerListFilter介面的getFilteredListOfServers方法,獲取到最終的Server列表;ServerListFilter介面只有一個方法getFilteredListOfServers,用於過濾Server列表。

  通過原始碼可以檢視ServerListFilter介面的實現類及類繼承關係

  這些實現類中,除了ZonePreferenceServerListFilter的實現是SpringCloud Ribbon對於Netflix Ribbon的擴充套件實現外,其餘的都是Netflix Ribbon中的原生實現。

  a、AbstractServerListFilter

  AbstractServerListFilter是一個抽象的過濾器,在這裡就做了一件事,定義了過濾時需要的一個重要的LoadBalancerStatus物件(在之前已經說過,該物件儲存了用於負載均衡器的一些屬性和統計資訊)

public abstract class AbstractServerListFilter<T extends Server> implements ServerListFilter<T> {

    private volatile LoadBalancerStats stats;
    
    public void setLoadBalancerStats(LoadBalancerStats stats) {
        this.stats = stats;
    }
    
    public LoadBalancerStats getLoadBalancerStats() {
        return stats;
    }

}

  b、ZoneAffinityServerListFilter

    @Override
    public List<T> getFilteredListOfServers(List<T> servers) {
        if (zone != null && (zoneAffinity || zoneExclusive) && servers !=null && servers.size() > 0){
            List<T> filteredServers = Lists.newArrayList(Iterables.filter(
                    servers, this.zoneAffinityPredicate.getServerOnlyPredicate()));
            if (shouldEnableZoneAffinity(filteredServers)) {
                return filteredServers;
            } else if (zoneAffinity) {
                overrideCounter.increment();
            }
        }
        return servers;
    }

    private boolean shouldEnableZoneAffinity(List<T> filtered) {    
        if (!zoneAffinity && !zoneExclusive) {
            return false;
        }
        if (zoneExclusive) {
            return true;
        }
        LoadBalancerStats stats = getLoadBalancerStats();
        if (stats == null) {
            return zoneAffinity;
        } else {
            logger.debug("Determining if zone affinity should be enabled with given server list: {}", filtered);
            ZoneSnapshot snapshot = stats.getZoneSnapshot(filtered);
            double loadPerServer = snapshot.getLoadPerServer();
            int instanceCount = snapshot.getInstanceCount();            
            int circuitBreakerTrippedCount = snapshot.getCircuitTrippedCount();
            if (((double) circuitBreakerTrippedCount) / instanceCount >= blackOutServerPercentageThreshold.get() 
                    || loadPerServer >= activeReqeustsPerServerThreshold.get()
                    || (instanceCount - circuitBreakerTrippedCount) < availableServersThreshold.get()) {
                logger.debug("zoneAffinity is overriden. blackOutServerPercentage: {}, activeReqeustsPerServer: {}, availableServers: {}", 
                        new Object[] {(double) circuitBreakerTrippedCount / instanceCount,  loadPerServer, instanceCount - circuitBreakerTrippedCount});
                return false;
            } else {
                return true;
            }
            
        }
    }

  通過實現介面的getFilteredListOfServers方法,可以看到,先使用Iterables.filter(servers, this.zoneAffinityPredicate.getServerOnlyPredicate())來進行服務例項和消費者的Zone進行比較,獲得比較成功的Server集合,然後再呼叫shouldEnableZoneAffinity方法判斷是否要啟用“區域感知”的功能,如果開啟,則直接返回過濾後的Server集合,否則返回全量Server集合。

  在shouldEnableZoneAffinity方法中,使用了LoadBalancerStatus的getLoadPerServer方法來獲取同區域例項的基礎指標(包含實力數量、斷路器斷開數、活動請求數、例項平均負載等)根據一些列的演算法與設定的閾值進行對比,如果滿足條件,則不開啟區域感知。這一實現主要為了解決如果叢集出現區域故障時,仍可使用使用其他區域的例項進行正常的服務,從而達到叢集的高可用。對原始碼中是否開啟區域感知的條件解釋如下:

  circuitBreakerTrippedCount) / instanceCount >= blackOutServerPercentageThreshold.get():故障例項百分比(斷路器斷開數/例項數量),預設閾值0.8

  loadPerServer >= activeReqeustsPerServerThreshold.get():例項平均負載,預設閾值0.6

  (instanceCount - circuitBreakerTrippedCount) < availableServersThreshold.get():可用例項數(例項數量-斷路器斷開數),預設閾值2

  c、DefaultNIWSServerListFilter

  該過濾器完全繼承自ZoneAffinityServerListFilter,沒有任何自己的程式碼,是預設的NIWS(Netflix Internal Web Service)過濾器

  d、ServerListSubsetFilter

  該過濾器繼承自ZoneAffinityServerListFilter類,檢視其實現ServerListFilter方法getFilteredListOfServers,它適用於擁有大規模服務叢集的系統因為它可以產生一個“區域感知”結果的子集列表,同時它還能夠通過比較服務例項的通訊失敗數量和併發連線數來判斷該服務是否健康來選擇性地從服務例項列表中剔除那些不夠健康的例項。可以看到,該過濾器的實現主要分為三步:

    先呼叫父類ZoneAffinityServerListFilter的getFilteredListOfServers獲取服務例項清單

    從當前消費者消費者維護的例項中剔除那些相對不夠健康的例項

        不健康例項標準:

          服務的併發數超過客戶端配置的值(預設配置值為0)

          服務例項的失敗數超過客戶端的配置值(預設配置值為0)

          上述兩筆剔除完畢後,剔除比例如果小於10%(預設值),則將剩餘的例項按照健康狀態排序,從最不健康的開始剔除,知道滿足剔除10%

    完成剔除後,清單已經少了至少10%的服務例項,最後通過隨機的方式從候選清單中選一批例項加入到清單中,以保證服務例項子集和與原來的數量一致(預設20個)

    @Override
    public List<T> getFilteredListOfServers(List<T> servers) {
        List<T> zoneAffinityFiltered = super.getFilteredListOfServers(servers);
        Set<T> candidates = Sets.newHashSet(zoneAffinityFiltered);
        Set<T> newSubSet = Sets.newHashSet(currentSubset);
        LoadBalancerStats lbStats = getLoadBalancerStats();
        for (T server: currentSubset) {
            // this server is either down or out of service
            if (!candidates.contains(server)) {
                newSubSet.remove(server);
            } else {
                ServerStats stats = lbStats.getSingleServerStat(server);
                // remove the servers that do not meet health criteria
                if (stats.getActiveRequestsCount() > eliminationConnectionCountThreshold.get()
                        || stats.getFailureCount() > eliminationFailureCountThreshold.get()) {
                    newSubSet.remove(server);
                    // also remove from the general pool to avoid selecting them again
                    candidates.remove(server);
                }
            }
        }
        int targetedListSize = sizeProp.get();
        int numEliminated = currentSubset.size() - newSubSet.size();
        int minElimination = (int) (targetedListSize * eliminationPercent.get());
        int numToForceEliminate = 0;
        if (targetedListSize < newSubSet.size()) {
            // size is shrinking
            numToForceEliminate = newSubSet.size() - targetedListSize;
        } else if (minElimination > numEliminated) {
            numToForceEliminate = minElimination - numEliminated; 
        }
        
        if (numToForceEliminate > newSubSet.size()) {
            numToForceEliminate = newSubSet.size();
        }

        if (numToForceEliminate > 0) {
            List<T> sortedSubSet = Lists.newArrayList(newSubSet);           
            Collections.sort(sortedSubSet, this);
            List<T> forceEliminated = sortedSubSet.subList(0, numToForceEliminate);
            newSubSet.removeAll(forceEliminated);
            candidates.removeAll(forceEliminated);
        }
        
        // after forced elimination or elimination of unhealthy instances,
        // the size of the set may be less than the targeted size,
        // then we just randomly add servers from the big pool
        if (newSubSet.size() < targetedListSize) {
            int numToChoose = targetedListSize - newSubSet.size();
            candidates.removeAll(newSubSet);
            if (numToChoose > candidates.size()) {
                // Not enough healthy instances to choose, fallback to use the
                // total server pool
                candidates = Sets.newHashSet(zoneAffinityFiltered);
                candidates.removeAll(newSubSet);
            }
            List<T> chosen = randomChoose(Lists.newArrayList(candidates), numToChoose);
            for (T server: chosen) {
                newSubSet.add(server);
            }
        }
        currentSubset = newSubSet;       
        return Lists.newArrayList(newSubSet);            
    }

  e、ZonePreferenceServerListFilter

  SpringCloud整合時新增的過濾器,該過濾器繼承自ZoneAffinityServerListFilter,若使用SpringCloud整合Eureka和Ribbon,該過濾器是預設過濾器,它實現了通過配置或者Eureka例項元資料區域(Zone)來過濾出同區域服務例項。

  如下面原始碼所示,首先使用ZoneAffinityServerListFilter獲取所有服務例項,然後使用消費者預設立的Zone進行過濾,如果過濾為空,就直接返回父類獲取的例項集合(所有例項),如果不為空,就返回過濾後的例項集合。

    @Override
    public List<Server> getFilteredListOfServers(List<Server> servers) {
        List<Server> output = super.getFilteredListOfServers(servers);
        if (this.zone != null && output.size() == servers.size()) {
            List<Server> local = new ArrayList<>();
            for (Server server : output) {
                if (this.zone.equalsIgnoreCase(server.getZone())) {
                    local.add(server);
                }
            }
            if (!local.isEmpty()) {
                return local;
            }
        }
        return output;
    }

  4、ZoneAwareLoadBalancer

  ZoneAwareLoadBalancer負載均衡是對DynamicServerListLoadBalancer負載均衡器的擴充套件。在DynamicServerListLoadBalancer負載均衡器中,並沒有重寫選擇具體服務例項的chooseServer方法,所以它會採用BaseLoadBalancer中實現的演算法,使用RoundRobinRule規則,輪詢選擇呼叫的例項。該規則沒有區域的概念,他會把所有的例項看成一個Zone,這樣就會週期性的跨域訪問。由於跨域訪問會產生更高的延遲,這些例項主要是防止區域性故障而實現高可用目的使用的,而不應該是常規訪問,所以在多區域部署的情況下一定有效能問題,那麼ZoneAwareLoadBalancer是如何避免這個問題呢?

  首先,在ZoneAwareLoadBalancer負載均衡器中並沒有重寫setServerList方法,說明實現服務例項清單更新主邏輯沒有變化,但是我們發現他重寫了父類的setServerListForZones方法,先看父類該方法的呼叫鏈,可見在setServerList方法中,最後呼叫了setServerListForZones方法,父類中的setServerListForZones是將一個key為Zone,value為Server集合的map存入了LoadBalancerStatus中。

    @Override
    public void setServersList(List lsrv) {
        super.setServersList(lsrv);
        List<T> serverList = (List<T>) lsrv;
        Map<String, List<Server>> serversInZones = new HashMap<String, List<Server>>();
        for (Server server : serverList) {
            // make sure ServerStats is created to avoid creating them on hot
            // path
            getLoadBalancerStats().getSingleServerStat(server);
            String zone = server.getZone();
            if (zone != null) {
                zone = zone.toLowerCase();
                List<Server> servers = serversInZones.get(zone);
                if (servers == null) {
                    servers = new ArrayList<Server>();
                    serversInZones.put(zone, servers);
                }
                servers.add(server);
            }
        }
        setServerListForZones(serversInZones);
    }

    protected void setServerListForZones(
            Map<String, List<Server>> zoneServersMap) {
        LOGGER.debug("Setting server list for zones: {}", zoneServersMap);
        getLoadBalancerStats().updateZoneServerMapping(zoneServersMap);
    }

  那麼在ZoneAwareLoadBalancer負載均衡器中,對於setServerListForZones方法的重寫原始碼如下面所示:

    @Override
    protected void setServerListForZones(Map<String, List<Server>> zoneServersMap) {
        super.setServerListForZones(zoneServersMap);
        if (balancers == null) {
            balancers = new ConcurrentHashMap<String, BaseLoadBalancer>();
        }
        for (Map.Entry<String, List<Server>> entry: zoneServersMap.entrySet()) {
            String zone = entry.getKey().toLowerCase();
            getLoadBalancer(zone).setServersList(entry.getValue());
        }
        // check if there is any zone that no longer has a server
        // and set the list to empty so that the zone related metrics does not
        // contain stale data
        for (Map.Entry<String, BaseLoadBalancer> existingLBEntry: balancers.entrySet()) {
            if (!zoneServersMap.keySet().contains(existingLBEntry.getKey())) {
                existingLBEntry.getValue().setServersList(Collections.emptyList());
            }
        }
    }   

    @VisibleForTesting
    BaseLoadBalancer getLoadBalancer(String zone) {
        zone = zone.toLowerCase();
        BaseLoadBalancer loadBalancer = balancers.get(zone);
        if (loadBalancer == null) {
            // We need to create rule object for load balancer for each zone
            IRule rule = cloneRule(this.getRule());
            loadBalancer = new BaseLoadBalancer(this.getName() + "_" + zone, rule, this.getLoadBalancerStats());
            BaseLoadBalancer prev = balancers.putIfAbsent(zone, loadBalancer);
            if (prev != null) {
                loadBalancer = prev;
            }
        } 
        return loadBalancer;        
    }

    private IRule cloneRule(IRule toClone) {
        IRule rule;
        if (toClone == null) {
            rule = new AvailabilityFilteringRule();
        } else {
            String ruleClass = toClone.getClass().getName();                
            try {
                rule = (IRule) ClientFactory.instantiateInstanceWithClientConfig(ruleClass, this.getClientConfig());
            } catch (Exception e) {
                throw new RuntimeException("Unexpected exception creating rule for ZoneAwareLoadBalancer", e);
            }
        }
        return rule;
    }

  從上述原始碼可以看出,在該實現類中建立了一個ConcurrentHashMap型別的balancers物件,它用來儲存每個Zone區域對應的負載均衡器,而具體的建立負載均衡器則是呼叫getLoadBalancer方法來建立的,在該方法中,同時會建立其Rule規則,如果沒有,則預設建立使用AvailabilityFilteringRule。在建立完LoadBalancer後,立馬呼叫setServerList方法為其設定對應Zone區域的例項清單,第二個迴圈是Zone區域清單的檢查,看看是否有Zone區域下已經沒有例項,是的話,將Zone區域中對應的例項列表清空。

  在瞭解了該負載均衡是如何擴充套件服務例項清單的實現後,接下來可以看下它是如何挑選服務例項的,那麼就要看chooseServer方法:

    @Override
    public Server chooseServer(Object key) {
        if (!ENABLED.get() || getLoadBalancerStats().getAvailableZones().size() <= 1) {
            logger.debug("Zone aware logic disabled or there is only one zone");
            return super.chooseServer(key);
        }
        Server server = null;
        try {
            LoadBalancerStats lbStats = getLoadBalancerStats();
            Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats);
            logger.debug("Zone snapshots: {}", zoneSnapshot);
            if (triggeringLoad == null) {
                triggeringLoad = DynamicPropertyFactory.getInstance().getDoubleProperty(
                        "ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".triggeringLoadPerServerThreshold", 0.2d);
            }

            if (triggeringBlackoutPercentage == null) {
                triggeringBlackoutPercentage = DynamicPropertyFactory.getInstance().getDoubleProperty(
                        "ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".avoidZoneWithBlackoutPercetage", 0.99999d);
            }
            Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get());
            logger.debug("Available zones: {}", availableZones);
            if (availableZones != null &&  availableZones.size() < zoneSnapshot.keySet().size()) {
                String zone = ZoneAvoidanceRule.randomChooseZone(zoneSnapshot, availableZones);
                logger.debug("Zone chosen: {}", zone);
                if (zone != null) {
                    BaseLoadBalancer zoneLoadBalancer = getLoadBalancer(zone);
                    server = zoneLoadBalancer.chooseServer(key);
                }
            }
        } catch (Exception e) {
            logger.error("Error choosing server using zone aware logic for load balancer={}", name, e);
        }
        if (server != null) {
            return server;
        } else {
            logger.debug("Zone avoidance logic is not invoked.");
            return super.chooseServer(key);
        }
    }

  從上述原始碼中可以看到,首先會判斷如果當前負載均衡器中維護的Zone區域小於1(getLoadBalancerStats().getAvailableZones().size() <= 1),則不執行選擇策略,直接使用父類的實現。

  如果當前負載均衡器中維護的Zone區域大於1,首先呼叫ZoneAvoidanceRule.createSnapshot(lbStats);方法獲取Zone對應快照的map集合,然後呼叫ZoneAvoidanceRule.getAvailableZones來獲取可用的Zone集合。當獲取的可用Zone集合不為空且數量小於總的Zone數量時,呼叫ZoneAvoidanceRule.randomChooseZone隨機獲取一個Zone區域;在確定了Zone區域後,則呼叫getLoadBalancer來獲取該區域對應的負載均衡器(同時會獲得負載均衡策略Rule),最終呼叫負載均衡器的chooseServer方法來挑選例項。

  這裡單獨說一下呼叫ZoneAvoidanceRule.getAvailableZones來獲取可用的Zone集合

    public static Set<String> getAvailableZones(
            Map<String, ZoneSnapshot> snapshot, double triggeringLoad,
            double triggeringBlackoutPercentage) {
        if (snapshot.isEmpty()) {
            return null;
        }
        Set<String> availableZones = new HashSet<String>(snapshot.keySet());
        if (availableZones.size() == 1) {
            return availableZones;
        }
        Set<String> worstZones = new HashSet<String>();
        double maxLoadPerServer = 0;
        boolean limitedZoneAvailability = false;

        for (Map.Entry<String, ZoneSnapshot> zoneEntry : snapshot.entrySet()) {
            String zone = zoneEntry.getKey();
            ZoneSnapshot zoneSnapshot = zoneEntry.getValue();
            int instanceCount = zoneSnapshot.getInstanceCount();
            if (instanceCount == 0) {
                availableZones.remove(zone);
                limitedZoneAvailability = true;
            } else {
                double loadPerServer = zoneSnapshot.getLoadPerServer();
                if (((double) zoneSnapshot.getCircuitTrippedCount())
                        / instanceCount >= triggeringBlackoutPercentage
                        || loadPerServer < 0) {
                    availableZones.remove(zone);
                    limitedZoneAvailability = true;
                } else {
                    if (Math.abs(loadPerServer - maxLoadPerServer) < 0.000001d) {
                        // they are the same considering double calculation
                        // round error
                        worstZones.add(zone);
                    } else if (loadPerServer > maxLoadPerServer) {
                        maxLoadPerServer = loadPerServer;
                        worstZones.clear();
                        worstZones.add(zone);
                    }
                }
            }
        }

        if (maxLoadPerServer < triggeringLoad && !limitedZoneAvailability) {
            // zone override is not needed here
            return availableZones;
        }
        String zoneToAvoid = randomChooseZone(snapshot, worstZones);
        if (zoneToAvoid != null) {
            availableZones.remove(zoneToAvoid);
        }
        return availableZones;

    }

  通過上述原始碼可以看到,首先,會剔除以下Zone區域:所屬例項為0的Zone區域、Zone區域內平均負載為0的Zone區域、故障率大於等於閾值(斷路器斷開時/例項數)(預設值0.99999)

  然後根據Zone區域的例項平均負載計算出最差的Zone區域(例項平均負載最高的Zone區域)

  如果上述過程中沒有剔除的Zone,同時例項最大平均負載小於閾值(預設20%),就直接返回所有Zone區域為可用區域,否則,從最壞的Zone區域中隨機選擇一個,將它從Zzone區域中剔除。