DUBBO服務治理
容錯主要是指服務出現了非業務異常之後采取的一些彌補措施,註意我這裏講的是非業務異常,因為業務異常出現的絕大多數情況都是代碼異常,所以及時采取了重試等邏輯還是會出現一樣的業務異常(代碼出問題了,鍋當然要自己背嘍)。
Dubbo中對於容錯的處理主要集中在Cluster中,Cluster包裝了底層調用的Invoker並且在Cluster自己本層做了一些出現異常之後的處理。
對於Dubbo的容錯主要是有兩層。第一層是mock,第二層是用戶配置的容錯策略。對於集群容錯的包裝邏輯入口就在於RegistryProtocol的doRefer()方法最後的cluster.join(directory),該方法返回了集群包裝過後的invoker,這裏的cluser其實就是MockClusterWrapper(至於為什麽能確定是MockClusterInvoker,就需要大家去理解一下Dubbo的SPI機制了),下面一起來看一下MockClusterInvoke的具體內容:
//真正起作用的是MockClusterInvoker public <T> Invoker<T> join(Directory<T> directory) throws RpcException { //因為cluster下面之後一層包裝,所以這裏的this.cluser就是默認的FialoverCluster return new MockClusterInvoker<T>(directory, this.cluster.join(directory)); } //MockClusterInvoker.invoke() public Result invoke(Invocation invocation) throws RpcException { Result result = null; //獲取URL中配置的mock參數 String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), Constants.MOCK_KEY, Boolean.FALSE.toString()).trim(); if (value.length() == 0 || value.equalsIgnoreCase("false")){ //如果沒有配置mock的話就直接進行調用 result = this.invoker.invoke(invocation); //如果配置了強制的mock就直接調用mock,不走正常調用邏輯 } else if (value.startsWith("force")) { if (logger.isWarnEnabled()) { logger.info("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + directory.getUrl()); } result = doMockInvoke(invocation, null); } else { //調用失敗之後再進行mock操作 try { result = this.invoker.invoke(invocation); }catch (RpcException e) { //mock並不會處理義務異常 if (e.isBiz()) { throw e; } else { if (logger.isWarnEnabled()) { logger.info("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + directory.getUrl(), e); } result = doMockInvoke(invocation, e); } } } return result; } //主要是挑選出可用的MockInvoker類然後調用其invoke方法返回結果 private Result doMockInvoke(Invocation invocation,RpcException e){ Result result = null; Invoker<T> minvoker ; //選取可用的MockInvoker List<Invoker<T>> mockInvokers = selectMockInvoker(invocation); if (mockInvokers == null || mockInvokers.size() == 0){ minvoker = (Invoker<T>) new MockInvoker(directory.getUrl()); } else { minvoker = mockInvokers.get(0); } try { result = minvoker.invoke(invocation); } catch (RpcException me) { //如果是業務異常就封裝結果(註意這裏和上面的區別),因為biz異常是用戶自己自己在mock信息中配置的異常,不是預想之外的異常 if (me.isBiz()) { result = new RpcResult(me.getCause()); } else { throw new RpcException(me.getCode(), getMockExceptionMessage(e, me), me.getCause()); } } catch (Throwable me) { throw new RpcException(getMockExceptionMessage(e, me), me.getCause()); } return result; } //核心最後還是調用了MockInvoker的invoker方法 public Result invoke(Invocation invocation) throws RpcException { String mock = getUrl().getParameter(invocation.getMethodName()+"."+Constants.MOCK_KEY); if (invocation instanceof RpcInvocation) { ((RpcInvocation) invocation).setInvoker(this); } if (StringUtils.isBlank(mock)){ mock = getUrl().getParameter(Constants.MOCK_KEY); } if (StringUtils.isBlank(mock)){ //這個錯誤比較常見,原因就在於客戶端調用的時候返回的異常信息是非業務異常,但是客戶端又沒有配置mock信息,因此就會拋出這個異常 throw new RpcException(new IllegalAccessException("mock can not be null. url :" + url)); } //解析出mock的類型,如果是mock=fail:AA,就返回AA,如果是mock=xx.Service就返回xx.Service,如果是mock=force:XX,就返回XX mock = normallizeMock(URL.decode(mock)); //如果配置的是:mock=fail:return,就直接返回空結果 if (Constants.RETURN_PREFIX.trim().equalsIgnoreCase(mock.trim())){ RpcResult result = new RpcResult(); result.setValue(null); return result; //如果配置的是mock=fail:return **,就解析**為對應的可返回內容然後返回 } else if (mock.startsWith(Constants.RETURN_PREFIX)) { mock = mock.substring(Constants.RETURN_PREFIX.length()).trim(); mock = mock.replace(‘`‘, ‘"‘); try { Type[] returnTypes = RpcUtils.getReturnTypes(invocation); Object value = parseMockValue(mock, returnTypes); return new RpcResult(value); } catch (Exception ew) { throw new RpcException("mock return invoke error. method :" + invocation.getMethodName() + ", mock:" + mock + ", url: "+ url , ew); } //如果配置的是mock=fail:throw **(用戶自定義的異常信息),就解析**為對應的可返回內容然後返回 } else if (mock.startsWith(Constants.THROW_PREFIX)) { mock = mock.substring(Constants.THROW_PREFIX.length()).trim(); mock = mock.replace(‘`‘, ‘"‘); if (StringUtils.isBlank(mock)){ throw new RpcException(" mocked exception for Service degradation. "); } else { //用戶自定義類 Throwable t = getThrowable(mock); throw new RpcException(RpcException.BIZ_EXCEPTION, t); } } else { //如果mock信息為ServiceMock的話就直接找到對應的Mock類進行mock調用,然後返回結果 try { Invoker<T> invoker = getInvoker(mock); return invoker.invoke(invocation); } catch (Throwable t) { throw new RpcException("Failed to create mock implemention class " + mock , t); } } }
從上面的邏輯上來看mock主要是根據用戶的一些配置,做一些非常具體的容錯邏輯,精確到方法界別的,所以算是容錯的最小粒度了。我們經常在使用中對一些不可靠服務進行mock處理,防止在其出現異常時候影響我們的核心調用流程。
方法級別的容錯總體來說是針對業務異常的一種容錯,而針對非業務異常的容錯邏輯就是另外一個概念了,比如說由於網絡抖動導致某次調用沒有成功,針對類似的異常Dubbo也有自己的容錯措施,具體如下面幾種:
Failover Cluster 失敗自動切換,當出現失敗,重試其它服務器。(缺省) 通常用於讀操作,但重試會帶來更長延遲。
Failfast Cluster 快速失敗,只發起一次調用,失敗立即報錯。通常用於非冪等性的寫操作,比如新增記錄。
Failback Cluster 失敗自動恢復,後臺記錄失敗請求,定時重發。 通常用於消息通知操作。
Forking Cluster 並行調用多個服務器,只要一個成功即返回。通常用於實時性要求較高的讀操作,但需要浪費更多服務資源。可通過forks="2"來設置最大並行數。
Broadcast Cluster 廣播調用所有提供者,逐個調用,任意一臺報錯則報錯。(2.1.0開始支持) 通常用於通知所有提供者更新緩存或日誌等本地資源信息。
因為種類比較多,全部講一遍太費時間了,所以選擇比較常用的幾個進行介紹。
Failover
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
List<Invoker<T>> copyinvokers = invokers;
//檢查copyinvokers是否為null
checkInvokers(copyinvokers, invocation);
//重試次數,默認為3次,不包含第一次調用
int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
if (len <= 0) {
len = 1;
}
// last exception.
RpcException le = null;
//已經調用的Invoker列表
List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size());
Set<String> providers = new HashSet<String>(len);
for (int i = 0; i < len; i++) {
//重試時,進行重新選擇,避免重試時invoker列表已發生變化.
//註意:如果列表發生了變化,那麽invoked判斷會失效,因為invoker示例已經改變
if (i > 0) {
checkWheatherDestoried();
copyinvokers = list(invocation);
//重新檢查一下有沒有對應的提供者
checkInvokers(copyinvokers, invocation);
}
//通過loadbalance去選出目標Invoker
//這裏默認的LoadBalance是RandomLoadBalance,選擇時候是根據權重來選擇目標的Invoker,當然也可以配置其他的LoadBalance
Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
invoked.add(invoker);
//添加到上下文環境中去,但是這裏為什麽會把失敗的invoker也加進來,感覺失敗的Invoker信息並沒有什麽意義
RpcContext.getContext().setInvokers((List)invoked);
try {
//這裏才是最後的調用,使用經過loadbalance選出的invoker去調用
Result result = invoker.invoke(invocation);
if (le != null && logger.isWarnEnabled()) {
logger.warn("Although retry the method " + invocation.getMethodName()
+ " in the service " + getInterface().getName()
+ " was successful by the provider " + invoker.getUrl().getAddress()
+ ", but there have been failed providers " + providers
+ " (" + providers.size() + "/" + copyinvokers.size()
+ ") from the registry " + directory.getUrl().getAddress()
+ " on the consumer " + NetUtils.getLocalHost()
+ " using the dubbo version " + Version.getVersion() + ". Last error is: "
+ le.getMessage(), le);
}
return result;
} catch (RpcException e) {
//業務異常不會重試,直接拋出
if (e.isBiz()) {
throw e;
}
le = e;
} catch (Throwable e) {
le = new RpcException(e.getMessage(), e);
} finally {
providers.add(invoker.getUrl().getAddress());
}
}
throw new RpcException(le != null ? le.getCode() : 0, "Failed to invoke the method "
+ invocation.getMethodName() + " in the service " + getInterface().getName()
+ ". Tried " + len + " times of the providers " + providers
+ " (" + providers.size() + "/" + copyinvokers.size()
+ ") from the registry " + directory.getUrl().getAddress()
+ " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
+ Version.getVersion() + ". Last error is: "
+ (le != null ? le.getMessage() : ""), le != null && le.getCause() != null ? le.getCause() : le);
}
Failfast
//快速失敗的邏輯最簡單了,就是什麽都不做,有調用異常的話就直接往上拋出
public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
checkInvokers(invokers, invocation);
Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
try {
return invoker.invoke(invocation);
//簡單區分異常類型
} catch (Throwable e) {
if (e instanceof RpcException && ((RpcException)e).isBiz()) { // biz exception.
throw (RpcException) e;
}
throw new RpcException(e instanceof RpcException ? ((RpcException)e).getCode() : 0, "Failfast invoke providers " + invoker.getUrl() + " " + loadbalance.getClass().getSimpleName() + " select from all providers " + invokers + " for service " + getInterface().getName() + " method " + invocation.getMethodName() + " on consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e);
}
}
ailover與failfast的代碼基本一樣,主要區別就是出現異常之後直接忽略,然後返回空的RpcResult。其余的幾種集群策略在平時用的比較少,我就不多介紹了,其實現也都比較簡單。
Dubbo的容錯不僅體現在provider的cluster,對於註冊中心也有提供cluster內容(AvailableCluster),只不過該內容比較簡單,只是隨機選取了一個可用的註冊中心。
負載均衡
負載均衡的概念:從多個目標服務器中選擇出其中一個服務器供客戶端調用,這個選擇的具體過程就叫做負載均衡(純粹是自己給小白用戶的解釋)。
一般的服務性框架都會有負載均衡的內容,Dubbo也是基於自己的URL機制做了一層負載均衡,我們看到上面的集群內容時候就看到集群內部其實就依賴了負載均衡策略來從多個Invoker中選取其中的一個,只不過一次負載所選擇到的Invoker並不一定能滿足條件,比如在Failover策略下,失敗之後Loadbalance重新選擇的Invoker還是失敗過的那個,那就要重新計算了。
Dubbo的負載均衡策略主要有以下幾種:
Random LoadBalance 隨機選取服務提供者 最簡單的無狀態的負載均衡算法
RoundRobin LoadBalance 以輪訓的方式調用服務提供者 缺點是有狀態,必須在並發之下記住上一次到誰了
LeastActive LoadBalance 調用最少活躍調調用的服務提供者,這裏有一點需要註意,這裏的服務調用統計維度是在方法級別的,也就是說是方法級別的LoadBalance。
ConsistentHash LoadBalance 一致性Hash(用的不多,不多講解)
首先看下LoadBalance的接口就直到它是做什麽的:
//從invokers列表中根據url和invocation信息選出一個合適的Invoker供consumer端調用
<T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;
在所有的LoadBalance中都提到了一個概念:weight。正常情況下我們無論配置在provider還是service中,對應的所有服務端的providerUrl的weight都是一樣的,這種情況其實weight配置不配置意義不大。但是一旦動態針對某個服務調整過weight值,這個影響就出來了。例如:override://10.20.153.10/com.foo.BarService?category=configurators&dynamic=false&weight=200 配置之後,就將10.20.153.10服務器上的com.foo.BarServic的權值改為了200,那麽它在之後的LoadBalance中被選取的可能性就更大了(默認的權值為100)。
Random LoadBalance
private final Random random = new Random();
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
int length = invokers.size(); // 總個數
int totalWeight = 0; // 總權重
boolean sameWeight = true; // 權重是否都一樣
for (int i = 0; i < length; i++) {
int weight = getWeight(invokers.get(i), invocation);
totalWeight += weight; // 累計總權重
if (sameWeight && i > 0
&& weight != getWeight(invokers.get(i - 1), invocation)) {
sameWeight = false; // 計算所有權重是否一樣
}
}
if (totalWeight > 0 && ! sameWeight) {
// 如果權重不相同且權重大於0則按總權重數隨機
int offset = random.nextInt(totalWeight);
// 隨機數落到哪個片段上,就取哪個片段對應的Invoker對象(擊鼓傳花式往後切割)
for (int i = 0; i < length; i++) {
offset -= getWeight(invokers.get(i), invocation);
if (offset < 0) {
return invokers.get(i);
}
}
}
// 如果權重相同或權重為0則均等隨機
return invokers.get(random.nextInt(length));
}
RoundRobin LoadBalance
//輪訓是針對方法級別的,並不是所有服務調用
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
int length = invokers.size(); // 總個數
int maxWeight = 0; // 最大權重
int minWeight = Integer.MAX_VALUE; // 最小權重
// invoker->weight,IntegerWrapper就是一個簡單的Integer包裝類
final LinkedHashMap<Invoker<T>, IntegerWrapper> invokerToWeightMap = new LinkedHashMap<Invoker<T>, IntegerWrapper>();
int weightSum = 0;
for (int i = 0; i < length; i++) {
int weight = getWeight(invokers.get(i), invocation);
maxWeight = Math.max(maxWeight, weight); // 累計最大權重
minWeight = Math.min(minWeight, weight); // 累計最小權重
if (weight > 0) {
invokerToWeightMap.put(invokers.get(i), new IntegerWrapper(weight));
weightSum += weight;
}
}
AtomicPositiveInteger sequence = sequences.get(key);
if (sequence == null) {
sequences.putIfAbsent(key, new AtomicPositiveInteger());
sequence = sequences.get(key);
}
//currentSequence代表某個方法是第多少次被調用的,例如第1W次
int currentSequence = sequence.getAndIncrement();
if (maxWeight > 0 && minWeight < maxWeight) { // 權重不一樣
// 可以把weightSum理解成一個權重範圍內的總集,mod就帶表在這個總集中具體執行到的位置
int mod = currentSequence % weightSum;
//weightSum < maxWeight*length
for (int i = 0; i < maxWeight; i++) {
for (Map.Entry<Invoker<T>, IntegerWrapper> each : invokerToWeightMap.entrySet()) {
final Invoker<T> k = each.getKey();
final IntegerWrapper v = each.getValue();
//這裏的邏輯比較抽象,本質上就是誰的權重越大,輪詢到誰的次數就越多
if (mod == 0 && v.getValue() > 0) {
return k;
}
if (v.getValue() > 0) {
v.decrement();
mod--;
}
}
}
}
// 取模輪循
return invokers.get(currentSequence % length);
}
簡單題下RoundRobin的弊端:如果某個服務有3臺服務器,權重依次是10,1000,100,可能配置的人的本意是在輪訓的時候走到三臺機器的比例是:1:100:10,但是實際上確實是1000個請求壓倒了第二臺機器上。。。
LeastActive LoadBalance
private final Random random = new Random();
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
int length = invokers.size(); // 總個數
int leastActive = -1; // 最小的活躍數
int leastCount = 0; // 相同最小活躍數的個數
int[] leastIndexs = new int[length]; // 相同最小活躍數的下標
int totalWeight = 0; // 總權重
int firstWeight = 0; // 第一個權重,用於於計算是否相同
boolean sameWeight = true; // 是否所有權重相同
for (int i = 0; i < length; i++) {
Invoker<T> invoker = invokers.get(i);
int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive(); // 活躍數
int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT); // 權重
if (leastActive == -1 || active < leastActive) { // 如果是初始情況下或者某臺機器的active數量小於現在保存的leastActive數量,就會重新開始
leastActive = active; // 記錄最小活躍數
leastCount = 1; // 重新統計相同最小活躍數的個數
leastIndexs[0] = i; // 重新記錄最小活躍數下標
totalWeight = weight; // 重新累計總權重
firstWeight = weight; // 記錄第一個權重
sameWeight = true; // 還原權重相同標識
} else if (active == leastActive) { // 累計相同最小的活躍數
leastIndexs[leastCount ++] = i; // 累計相同最小活躍數下標
totalWeight += weight; // 累計總權重
// 判斷所有權重是否一樣
if (sameWeight && i > 0
&& weight != firstWeight) {
sameWeight = false;
}
}
}
// 如果只有一個最小值的話就直接調用
if (leastCount == 1) {
// 如果只有一個最小則直接返回
return invokers.get(leastIndexs[0]);
}
if (! sameWeight && totalWeight > 0) {
// 如果權重不相同且權重大於0則按總權重數隨機
int offsetWeight = random.nextInt(totalWeight);
// 並確定隨機值落在哪個片斷上
for (int i = 0; i < leastCount; i++) {
int leastIndex = leastIndexs[i];
offsetWeight -= getWeight(invokers.get(leastIndex), invocation);
if (offsetWeight <= 0)
return invokers.get(leastIndex);
}
}
// 如果權重相同或權重為0則均等隨機
return invokers.get(leastIndexs[random.nextInt(leastCount)]);
一致性哈希對應的LoadBalance本次不講解。
我個人對於Dubbo 提供的LoadBalance的看法是:基本上滿足日常使用,但是應該更加豐富。因為我們日常使用的是leastactive,但是因為該負載均衡策略是基於方法級別的,所以無法控制其他的方法對於應用的影響,這裏如果將統計的維度上升到機器緯度,其實可以做到類似於整個集群的leastactive,這樣的話就不容易出現部分幾臺機器負載特別高,而其余的大部分機器都有很多資源結余。當然也可以提供接口緯度的負載均衡,這個完全可以根據具體的業務實現定值,因為SPI機制的緣故,自定義負載均衡策略實現起來還是比較方便的。
路由
說到路由還是貼一張官方的圖會比較好理解一些
從上圖中能看出來router的作用其實是在LB之前的,也就是說LB的入參其實就是router的結果。
因為router可能理解起來並不直觀,因此還是大致介紹一下router的含義。
在日常的服務這裏過程中,比如我想給某個接口開個特權,專門設置一些提供者只供其調用;讀寫分離,讀操作配置專門的機器,寫操作配置專門的機器;某個消費者有問題,想及時下掉等等。都是可以通過router來實現的,明白了router的具體含義之後我們來一起看一下router的實現:
首先為了明確路由的基本規則,把官方的案例拿過來一起看一下:
服務調用信息,如:method, argument 等 (暫不支持參數路由) URL本身的字段,如:protocol, host, port 等 以及URL上的所有參數,如:application, organization 等
條件支持:
等號"="表示"匹配",如:host = 10.20.153.10 不等號"!="表示"不匹配",如:host != 10.20.153.10
值支持:
以逗號","分隔多個值,如:host != 10.20.153.10,10.20.153.11 以星號""結尾,表示通配,如:host != 10.20. 以美元符"$"開頭,表示引用消費者參數,如:host = $host
下面以ConditionRouter為例來看一下路由規則的具體實現
//構造函數初始化的時候難點在於when和then的初始化:
public ConditionRouter(URL url) {
this.url = url;
//路由規則的優先級,用於排序,優先級越大越靠前執行,可不填,缺省為0。
this.priority = url.getParameter(Constants.PRIORITY_KEY, 0);
//當路由結果為空時,是否強制執行,如果不強制執行,路由結果為空的路由規則將自動失效,可不填,缺省為flase
this.force = url.getParameter(Constants.FORCE_KEY, false);
try {
String rule = url.getParameterAndDecoded(Constants.RULE_KEY);
if (rule == null || rule.trim().length() == 0) {
throw new IllegalArgumentException("Illegal route rule!");
}
rule = rule.replace("consumer.", "").replace("provider.", "");
int i = rule.indexOf("=>");
// =>前的部分
String whenRule = i < 0 ? null : rule.substring(0, i).trim();
// =>後的部分
String thenRule = i < 0 ? rule.trim() : rule.substring(i + 2).trim();
Map<String, MatchPair> when = StringUtils.isBlank(whenRule) || "true".equals(whenRule) ? new HashMap<String, MatchPair>() : parseRule(whenRule);
Map<String, MatchPair> then = StringUtils.isBlank(thenRule) || "false".equals(thenRule) ? null : parseRule(thenRule);
// NOTE: When條件是允許為空的,外部業務來保證類似的約束條件
this.whenCondition = when;
this.thenCondition = then;
} catch (ParseException e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
/**
* 將對應的rule信息解析為對應的MatchPair
* host=10.20.153.10解析出來就是一個host:MatchPair,Matcher的matches內容為10.20.153.10
* host!=10.20.153.10解析出來就是一個host:MatchPair,Matcher的mismatches內容為10.20.153.10
* 可以理解為MatcherPair就是區分matches和mismatches的具體聚合類,拿到這個Matcher就拿到表達式初步解析後的數據
* @param rule
* @return
* @throws ParseException
*/
private static Map<String, MatchPair> parseRule(String rule)
throws ParseException {
Map<String, MatchPair> condition = new HashMap<String, MatchPair>();
if(StringUtils.isBlank(rule)) {
return condition;
}
// 匹配或不匹配Key-Value對
MatchPair pair = null;
// 多個Value值
Set<String> values = null;
final Matcher matcher = ROUTE_PATTERN.matcher(rule);
//例如:host=10.20.153.10 第一次匹配的group1=‘‘,group2=‘host‘,第二次匹配的group1=‘=‘,group2=‘10.20.153.10‘
while (matcher.find()) { // 逐個匹配
String separator = matcher.group(1);
String content = matcher.group(2);
// 表達式開始
if (separator == null || separator.length() == 0) {
pair = new MatchPair();
//‘host‘:new MatchPair()
condition.put(content, pair);
}
// &符號還沒有遇到過
else if ("&".equals(separator)) {
if (condition.get(content) == null) {
pair = new MatchPair();
condition.put(content, pair);
} else {
condition.put(content, pair);
}
}
// 匹配=號部分
else if ("=".equals(separator)) {
if (pair == null)
throw new RuntimeException();
values = pair.matches;
values.add(content);
}
// 匹配!=號部分
else if ("!=".equals(separator)) {
if (pair == null)
throw new RuntimeException();
values = pair.mismatches;
values.add(content);
}
// ,號直接跟著前面的=或者!=走
else if (",".equals(separator)) {
if (values == null || values.size() == 0)
throw new RuntimeException();
values.add(content);
} else {
throw new RuntimeException();
}
}
return condition;
}
public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation)
throws RpcException {
if (invokers == null || invokers.size() == 0) {
return invokers;
}
try {
//如果沒有匹配到的前置條件後直接返回,意思就是當前作用的consumer信息不需要經過路由操作
// 如果路由的配置值有$開頭的話就將其替換為URL中對應的key的value
if (! matchWhen(url)) {
return invokers;
}
List<Invoker<T>> result = new ArrayList<Invoker<T>>();
//黑名單
if (thenCondition == null) {
logger.warn("The current consumer in the service blacklist. consumer: " + NetUtils.getLocalHost() + ", service: " + url.getServiceKey());
return result;
}
for (Invoker<T> invoker : invokers) {
//逐個匹配後置條件
if (matchThen(invoker.getUrl(), url)) {
result.add(invoker);
}
}
if (result.size() > 0) {
return result;
//感覺強制執行的話返回一個空的List並沒有卵用呀
} else if (force) {
logger.warn("The route result is empty and force execute. consumer: " + NetUtils.getLocalHost() + ", service: " + url.getServiceKey() + ", router: " + url.getParameterAndDecoded(Constants.RULE_KEY));
return result;
}
} catch (Throwable t) {
logger.error("Failed to execute condition router rule: " + getUrl() + ", invokers: " + invokers + ", cause: " + t.getMessage(), t);
}
return invokers;
}
想了解更多詳細資料:1903832579
DUBBO服務治理