1. 程式人生 > >Dubbo分析之Cluster層

Dubbo分析之Cluster層

前言
本文繼續分析dubbo的cluster層,此層封裝多個提供者的路由及負載均衡,並橋接註冊中心,以Invoker為中心,擴充套件介面為Cluster, Directory, Router, LoadBalance;
Cluster介面
整個cluster層可以使用如下圖片概括:

Dubbo分析之Cluster層

各節點關係:
這裡的Invoker是Provider的一個可呼叫Service的抽象,Invoker封裝了Provider地址及Service介面資訊;
Directory代表多個Invoker,可以把它看成List,但與List不同的是,它的值可能是動態變化的,比如註冊中心推送變更;
Cluster將Directory中的多個Invoker偽裝成一個 Invoker,對上層透明,偽裝過程包含了容錯邏輯,呼叫失敗後,重試另一個;

br/>Router負責從多個Invoker中按路由規則選出子集,比如讀寫分離,應用隔離等;
LoadBalance負責從多個Invoker中選出具體的一個用於本次呼叫,選的過程包含了負載均衡演算法,呼叫失敗後,需要重選;
Cluster經過目錄,路由,負載均衡獲取到一個可用的Invoker,交給上層呼叫,介面如下:
@SPI(FailoverCluster.NAME)
public interface Cluster {

/**
 * Merge the directory invokers to a virtual invoker.
 *
 * @param <T>
 * @param directory
 * @return cluster invoker
 * @throws RpcException
 */
@Adaptive
<T> Invoker<T> join(Directory<T> directory) throws RpcException;

}
Cluster是一個叢集容錯介面,經過路由,負載均衡之後獲取的Invoker,由容錯機制來處理,dubbo提供了多種容錯機制包括:
Failover Cluster:失敗自動切換,當出現失敗,重試其它伺服器 [1]。通常用於讀操作,但重試會帶來更長延遲。可通過 retries=”2″ 來設定重試次數(不含第一次)。
Failfast Cluster:快速失敗,只發起一次呼叫,失敗立即報錯。通常用於非冪等性的寫操作,比如新增記錄。
Failsafe Cluster:失敗安全,出現異常時,直接忽略。通常用於寫入審計日誌等操作。
Failback Cluster:失敗自動恢復,後臺記錄失敗請求,定時重發。通常用於訊息通知操作。
Forking Cluster:並行呼叫多個伺服器,只要一個成功即返回。通常用於實時性要求較高的讀操作,但需要浪費更多服務資源。可通過 forks=”2″ 來設定最大並行數。
Broadcast Cluster:廣播呼叫所有提供者,逐個呼叫,任意一臺報錯則報錯 [2]。通常用於通知所有提供者更新快取或日誌等本地資源資訊。
預設使用了FailoverCluster,失敗的時候會預設重試其他伺服器,預設為兩次;當然也可以擴充套件其他的容錯機制;看一下預設的FailoverCluster容錯機制,具體原始碼在FailoverClusterInvoker中:
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
List<Invoker<T>> copyinvokers = invokers;
checkInvokers(copyinvokers, invocation);
int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
if (len <= 0) {
len = 1;
}
// retry loop.
RpcException le = null; // last exception.
List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers.
Set<String> providers = new HashSet<String>(len);
for (int i = 0; i < len; i++) {
//Reselect before retry to avoid a change of candidate invokers

.
//NOTE: if invokers changed, then invoked also lose accuracy.
if (i > 0) {
checkWhetherDestroyed();
copyinvokers = list(invocation);
// check again
checkInvokers(copyinvokers, invocation);
}
Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
invoked.add(invoker);
RpcContext.getContext().setInvokers((List) invoked);
try {
Result result = invoker.invoke(invocation);
if (le != null && logger.isWarnEnabled()) {
logger.warn("Although retry the method " + invocation.getMethodName()

  • " in the service " + getInterface().getName()
  • " was successful by the provider " + invoker.getUrl().getAddress()
  • ", but there have been failed providers " + providers
  • " (" + providers.size() + "/" + copyinvokers.size()
  • ") from the registry " + directory.getUrl().getAddress()
  • " on the consumer " + NetUtils.getLocalHost()
  • " using the dubbo version " + Version.getVersion() + ". Last error is: "
  • le.getMessage(), le);
    }
    return result;
    } catch (RpcException e) {
    if (e.isBiz()) { // biz exception.
    throw e;
    }
    le = e;
    } catch (Throwable e) {
    le = new RpcException(e.getMessage(), e);
    } finally {
    providers.add(invoker.getUrl().getAddress());
    }
    }
    throw new RpcException(le != null ? le.getCode() : 0, "Failed to invoke the method "

    • invocation.getMethodName() + " in the service " + getInterface().getName()
    • ". Tried " + len + " times of the providers " + providers
    • " (" + providers.size() + "/" + copyinvokers.size()
    • ") from the registry " + directory.getUrl().getAddress()
    • " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
    • Version.getVersion() + ". Last error is: "
    • (le != null ? le.getMessage() : ""), le != null && le.getCause() != null ? le.getCause() : le);
      }
      invocation是客戶端傳給伺服器的相關引數包括(方法名稱,方法引數,引數值,附件資訊),invokers是經過路由之後的伺服器列表,loadbalance是指定的負載均衡策略;首先檢查invokers是否為空,為空直接拋異常,然後獲取重試的次數預設為2次,接下來就是迴圈呼叫指定次數,如果不是第一次呼叫(表示第一次呼叫失敗),會重新載入伺服器列表,然後通過負載均衡策略獲取唯一的Invoker,最後就是通過Invoker把invocation傳送給伺服器,返回結果Result;
      具體的doInvoke方法是在抽象類AbstractClusterInvoker中被呼叫的:
      public Result invoke(final Invocation invocation) throws RpcException {
      checkWhetherDestroyed();
      LoadBalance loadbalance = null;
      List<Invoker<T>> invokers = list(invocation);
      if (invokers != null && !invokers.isEmpty()) {
      loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
      .getMethodParameter(RpcUtils.getMethodName(invocation), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
      }
      RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
      return doInvoke(invocation, invokers, loadbalance);
      }

      protected List<Invoker<T>> list(Invocation invocation) throws RpcException {
      List<Invoker<T>> invokers = directory.list(invocation);
      return invokers;
      }
      首先通過Directory獲取Invoker列表,同時在Directory中也會做路由處理,然後獲取負載均衡策略,最後呼叫具體的容錯策略;下面具體看一下Directory;
      Directory介面
      介面定義如下:
      public interface Directory<T> extends Node {

    /**

    • get service type.
    • @return service type.
      */
      Class<T> getInterface();

    /**

    • list invokers.
    • @return invokers
      */
      List<Invoker<T>> list(Invocation invocation) throws RpcException;

}
目錄服務作用就是獲取指定介面的服務列表,具體實現有兩個:StaticDirectory和RegistryDirectory,同時都繼承於AbstractDirectory;從名字可以大致知道StaticDirectory是一個固定的目錄服務,表示裡面的Invoker列表不會動態改變;RegistryDirectory是一個動態的目錄服務,通過註冊中心動態更新服務列表;list實現在抽象類中:
public List<Invoker<T>> list(Invocation invocation) throws RpcException {
if (destroyed) {
throw new RpcException("Directory already destroyed .url: " + getUrl());
}
List<Invoker<T>> invokers = doList(invocation);
List<Router> localRouters = this.routers; // local reference
if (localRouters != null && !localRouters.isEmpty()) {
for (Router router : localRouters) {
try {
if (router.getUrl() == null || router.getUrl().getParameter(Constants.RUNTIME_KEY, false)) {
invokers = router.route(invokers, getConsumerUrl(), invocation);
}
} catch (Throwable t) {
logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t);
}
}
}
return invokers;
}
首先檢查目錄是否被銷燬,然後呼叫doList,具體在實現類中定義,最後呼叫路由功能,下面重點看一下StaticDirectory和RegistryDirectory中的doList方法
1.RegistryDirectory
是一個動態的目錄服務,所有可以看到RegistryDirectory同時也繼承了NotifyListener介面,是一個通知介面,註冊中心有服務列表更新的時候,同時通知RegistryDirectory,通知邏輯如下:
public synchronized void notify(List<URL> urls) {
List<URL> invokerUrls = new ArrayList<URL>();
List<URL> routerUrls = new ArrayList<URL>();
List<URL> configuratorUrls = new ArrayList<URL>();
for (URL url : urls) {
String protocol = url.getProtocol();
String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
if (Constants.ROUTERS_CATEGORY.equals(category)
|| Constants.ROUTE_PROTOCOL.equals(protocol)) {
routerUrls.add(url);
} else if (Constants.CONFIGURATORS_CATEGORY.equals(category)
|| Constants.OVERRIDE_PROTOCOL.equals(protocol)) {
configuratorUrls.add(url);
} else if (Constants.PROVIDERS_CATEGORY.equals(category)) {
invokerUrls.add(url);
} else {
logger.warn("Unsupported category " + category + " in notified url: " + url + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost());
}
}
// configurators
if (configuratorUrls != null && !configuratorUrls.isEmpty()) {
this.configurators = toConfigurators(configuratorUrls);
}
// routers
if (routerUrls != null && !routerUrls.isEmpty()) {
List<Router> routers = toRouters(routerUrls);
if (routers != null) { // null - do nothing
setRouters(routers);
}
}
List<Configurator> localConfigurators = this.configurators; // local reference
// merge override parameters
this.overrideDirectoryUrl = directoryUrl;
if (localConfigurators != null && !localConfigurators.isEmpty()) {
for (Configurator configurator : localConfigurators) {
this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl);
}
}
// providers
refreshInvoker(invokerUrls);
}
此通知介面會接受三種類別的url包括:router(路由),configurator(配置),provider(服務提供方);
路由規則:決定一次dubbo服務呼叫的目標伺服器,分為條件路由規則和指令碼路由規則,並且支援可擴充套件,向註冊中心寫入路由規則的操作通常由監控中心或治理中心的頁面完成;
配置規則:向註冊中心寫入動態配置覆蓋規則 [1]。該功能通常由監控中心或治理中心的頁面完成;
provider:動態提供的服務列表
路由規則和配置規則其實就是對provider服務列表更新和過濾處理,refreshInvoker方法就是根據三種url類別重新整理本地的invoker列表,下面看一下RegistryDirectory實現的doList介面:
public List<Invoker<T>> doList(Invocation invocation) {
if (forbidden) {
// 1. No service provider 2. Service providers are disabled
throw new RpcException(RpcException.FORBIDDEN_EXCEPTION,
"No provider available from registry " + getUrl().getAddress() + " for service " + getConsumerUrl().getServiceKey() + " on consumer " + NetUtils.getLocalHost()

  • " use dubbo version " + Version.getVersion() + ", please check status of providers(disabled, not registered or in blacklist).");
    }
    List<Invoker<T>> invokers = null;
    Map<String, List<Invoker<T>>> localMethodInvokerMap = this.methodInvokerMap; // local reference
    if (localMethodInvokerMap != null && localMethodInvokerMap.size() > 0) {
    String methodName = RpcUtils.getMethodName(invocation);
    Object[] args = RpcUtils.getArguments(invocation);
    if (args != null && args.length > 0 && args[0] != null
    && (args[0] instanceof String || args[0].getClass().isEnum())) {
    invokers = localMethodInvokerMap.get(methodName + "." + args[0]); // The routing can be enumerated according to the first parameter
    }
    if (invokers == null) {
    invokers = localMethodInvokerMap.get(methodName);
    }
    if (invokers == null) {
    invokers = localMethodInvokerMap.get(Constants.ANY_VALUE);
    }
    if (invokers == null) {
    Iterator<List<Invoker<T>>> iterator = localMethodInvokerMap.values().iterator();
    if (iterator.hasNext()) {
    invokers = iterator.next();
    }
    }
    }
    return invokers == null ? new ArrayList<Invoker<T>>(0) : invokers;
    }
    refreshInvoker處理之後,服務列表已methodInvokerMap存在,一個方法對應服務列表Map>>;
    通過Invocation中指定的方法獲取對應的服務列表,如果具體的方法沒有對應的服務列表,則獲取”*”對應的服務列表;處理完之後就在父類中進行路由處理,路由規則同樣是通過通知介面獲取的,路由規則在下章介紹;
    2.StaticDirectory
    這是一個靜態的目錄服務,裡面的服務列表在初始化的時候就已經存在,並且不會改變;StaticDirectory用得比較少,主要用在服務對多註冊中心的引用;
    protected List<Invoker<T>> doList(Invocation invocation) throws RpcException {

    return invokers;

    }
    因為是靜態的,所有doList方法也很簡單,直接返回記憶體中的服務列表即可;
    Router介面
    路由規則決定一次dubbo服務呼叫的目標伺服器,分為條件路由規則和指令碼路由規則,並且支援可擴充套件,介面如下:
    public interface Router extends Comparable<Router> {

    /**

    • get the router url.
    • @return url
      */
      URL getUrl();

    /**

    • route.
    • @param invokers
    • @param url refer url
    • @param invocation
    • @return routed invokers
    • @throws RpcException
      */
      <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;

}
介面中提供的route方法通過一定的規則過濾出invokers的一個子集;提供了三個實現類:ScriptRouter,ConditionRouter和MockInvokersSelector
ScriptRouter:指令碼路由規則支援 JDK 指令碼引擎的所有指令碼,比如:javascript, jruby, groovy 等,通過type=javascript引數設定指令碼型別,預設為javascript;
ConditionRouter:基於條件表示式的路由規則,如:host = 10.20.153.10 => host = 10.20.153.11;=> 之前的為消費者匹配條件,所有引數和消費者的 URL 進行對比,=> 之後為提供者地址列表的過濾條件,所有引數和提供者的 URL 進行對比;
MockInvokersSelector:是否被配置為使用mock,此路由器保證只有具有協議MOCK的呼叫者出現在最終的呼叫者列表中,所有其他呼叫者將被排除;
下面重點看一下ScriptRouter原始碼
public ScriptRouter(URL url) {
this.url = url;
String type = url.getParameter(Constants.TYPE_KEY);
this.priority = url.getParameter(Constants.PRIORITY_KEY, 0);
String rule = url.getParameterAndDecoded(Constants.RULE_KEY);
if (type == null || type.length() == 0) {
type = Constants.DEFAULT_SCRIPT_TYPE_KEY;
}
if (rule == null || rule.length() == 0) {
throw new IllegalStateException(new IllegalStateException("route rule can not be empty. rule:" + rule));
}
ScriptEngine engine = engines.get(type);
if (engine == null) {
engine = new ScriptEngineManager().getEngineByName(type);
if (engine == null) {
throw new IllegalStateException(new IllegalStateException("Unsupported route rule type: " + type + ", rule: " + rule));
}
engines.put(type, engine);
}
this.engine = engine;
this.rule = rule;
}
構造器分別初始化指令碼引擎(engine)和指令碼程式碼(rule),預設的指令碼引擎是javascript;看一個具體的url:
"script://0.0.0.0/com.foo.BarService?category=routers&dynamic=false&rule=" + URL.encode("(function route(invokers) { ... } (invokers))")
script協議表示一個指令碼協議,rule後面是一段javascript指令碼,傳入的引數是invokers;
(function route(invokers) {
var result = new java.util.ArrayList(invokers.size());
for (i = 0; i < invokers.size(); i ++) {
if ("10.20.153.10".equals(invokers.get(i).getUrl().getHost())) {
result.add(invokers.get(i));
}
}
return result;
} (invokers)); // 表示立即執行方法
如上這段指令碼過濾出host為10.20.153.10,具體是如何執行這段指令碼的,在route方法中:
public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException {
try {
List<Invoker<T>> invokersCopy = new ArrayList<Invoker<T>>(invokers);
Compilable compilable = (Compilable) engine;
Bindings bindings = engine.createBindings();
bindings.put("invokers", invokersCopy);
bindings.put("invocation", invocation);
bindings.put("context", RpcContext.getContext());
CompiledScript function = compilable.compile(rule);
Object obj = function.eval(bindings);
if (obj instanceof Invoker[]) {
invokersCopy = Arrays.asList((Invoker<T>[]) obj);
} else if (obj instanceof Object[]) {
invokersCopy = new ArrayList<Invoker<T>>();
for (Object inv : (Object[]) obj) {
invokersCopy.add((Invoker<T>) inv);
}
} else {
invokersCopy = (List<Invoker<T>>) obj;
}
return invokersCopy;
} catch (ScriptException e) {
//fail then ignore rule .invokers.
logger.error("route error , rule has been ignored. rule: " + rule + ", method:" + invocation.getMethodName() + ", url: " + RpcContext.getContext().getUrl(), e);
return invokers;
br/>}
}
首先通過指令碼引擎編譯指令碼,然後執行指令碼,同時傳入Bindings引數,這樣在指令碼中就可以獲取invokers,然後進行過濾;最後來看一下負載均衡策略
LoadBalance介面
在叢集負載均衡時,Dubbo提供了多種均衡策略,預設為random隨機呼叫,可以自行擴充套件負載均衡策略;介面類如下:
@SPI(RandomLoadBalance.NAME)
public interface LoadBalance {

/**
 * select one invoker in list.
 *
 * @param invokers   invokers.
 * @param url        refer url
 * @param invocation invocation.
 * @return selected invoker.
 */
@Adaptive("loadbalance")
<T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;

}
SPI定義了預設的策略為RandomLoadBalance,提供了一個select方法,通過策略從服務列表中選擇一個invoker;dubbo預設提供了多種策略:
Random LoadBalance:隨機,按權重設定隨機概率,在一個截面上碰撞的概率高,但呼叫量越大分佈越均勻,而且按概率使用權重後也比較均勻,有利於動態調整提供者權重;
RoundRobin LoadBalance:輪詢,按公約後的權重設定輪詢比率;存在慢的提供者累積請求的問題,比如:第二臺機器很慢,但沒掛,當請求調到第二臺時就卡在那,
久而久之,所有請求都卡在調到第二臺上;
LeastActive LoadBalance:最少活躍呼叫數,相同活躍數的隨機,活躍數指呼叫前後計數差;使慢的提供者收到更少請求,因為越慢的提供者的呼叫前後計數差會越大;
ConsistentHash LoadBalance:一致性 Hash,相同引數的請求總是發到同一提供者;當某一臺提供者掛時,原本發往該提供者的請求,基於虛擬節點,平攤到其它提供者,不會引起劇烈變動;
下面重點看一下預設的RandomLoadBalance原始碼
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
int length = invokers.size(); // Number of invokers
int totalWeight = 0; // The sum of weights
boolean sameWeight = true; // Every invoker has the same weight?
for (int i = 0; i < length; i++) {
int weight = getWeight(invokers.get(i), invocation);
totalWeight += weight; // Sum
if (sameWeight && i > 0
&& weight != getWeight(invokers.get(i - 1), invocation)) {
sameWeight = false;
}
}
if (totalWeight > 0 && !sameWeight) {
// If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight.
int offset = random.nextInt(totalWeight);
// Return a invoker based on the random value.
for (int i = 0; i < length; i++) {
offset -= getWeight(invokers.get(i), invocation);
if (offset < 0) {
return invokers.get(i);
}
}
}
// If all invokers have the same weight value or totalWeight=0, return evenly.
return invokers.get(random.nextInt(length));
}
首先計算總權重,同時檢查是否每一個服務都有相同的權重;如果總權重大於0並且服務的權重都不相同,則通過權重來隨機選擇,否則直接通過Random函式來隨機;
總結
本文圍繞Cluster層中的幾個重要的介面從上到下來分別介紹,並重點介紹了其中的某些實現類;結合官方提供的呼叫圖,還是很容易理解此層的。