dubbo原始碼之Directory與LoadBalance
Directory:
叢集目錄服務Directory, 代表多個Invoker, 可以看成List<Invoker>,它的值可能是動態變化的比如註冊中心推送變更。叢集選擇呼叫服務時通過目錄服務找到所有服務
StaticDirectory: 靜態目錄服務, 它的所有Invoker通過建構函式傳入, 服務消費方引用服務的時候, 服務對多註冊中心的引用,將Invokers集合直接傳入 StaticDirectory構造器,再由Cluster偽裝成一個Invoker;StaticDirectory的list方法直接返回所有invoker集合;
RegistryDirectory: 註冊目錄服務, 它的Invoker集合是從註冊中心獲取的, 它實現了NotifyListener介面實現了回撥介面notify(List<Url>)
通俗的來說,就是一個快取和更新快取的過程
Directory目錄服務的更新過程
RegistryProtocol.doRefer方法,也就是消費端在初始化的時候,這裡涉及到了RegistryDirectory這個類。然後執行cluster.join(directory)方法。這些程式碼在上篇部落格有分析過。
cluster.join其實就是將Directory中的多個Invoker偽裝成一個Invoker, 對上層透明,包含叢集的容錯機制
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) { RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);//對多個invoker進行組裝 directory.setRegistry(registry); //ZookeeperRegistry directory.setProtocol(protocol); //protocol=Protocol$Adaptive //url=consumer://192.168.111.... URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, NetUtils.getLocalHost(), 0, type.getName(), directory.getUrl().getParameters()); //會把consumer://192... 註冊到註冊中心 if (! Constants.ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(Constants.REGISTER_KEY, true)) { //zkClient.create() registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY, Constants.CHECK_KEY, String.valueOf(false))); } directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY, Constants.PROVIDERS_CATEGORY + "," + Constants.CONFIGURATORS_CATEGORY + "," + Constants.ROUTERS_CATEGORY)); //Cluster$Adaptive return cluster.join(directory); }
directory.subscribe:
訂閱節點的變化,
1. 當zookeeper上指定節點發生變化以後,會通知到RegistryDirectory的notify方法
2. 將url轉化為invoker物件
呼叫過程中invokers的使用
再呼叫過程中,AbstractClusterInvoker.invoke方法中:其中list(invocation) 就是獲取directory中所快取的 invoker。呼叫AbstrctDirectory的list方法,再轉由呼叫RegisteryDirectory的doList,拿到成員變數methodInvokerMap裡的值。
public Result invoke(final Invocation invocation) throws RpcException { checkWhetherDestroyed(); LoadBalance loadbalance; List<Invoker<T>> invokers = list(invocation); if (invokers != null && invokers.size() > 0) { loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl() .getMethodParameter(invocation.getMethodName(),Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE)); } else { loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE); } RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation); return doInvoke(invocation, invokers, loadbalance); }
負載均衡LoadBalance:
LoadBalance負載均衡, 負責從多個 Invokers中選出具體的一個Invoker用於本次呼叫,呼叫過程中包含了負載均衡的演算法。
在AbstractClusterInvoker.invoke中程式碼如下,通過名稱獲得指定的擴充套件點。RandomLoadBalance:
public Result invoke(final Invocation invocation) throws RpcException { checkWhetherDestroyed(); LoadBalance loadbalance; List<Invoker<T>> invokers = list(invocation); if (invokers != null && invokers.size() > 0) {//預設拓展點是隨機演算法@SPI(RandomLoadBalance.NAME) loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl() .getMethodParameter(invocation.getMethodName(),Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE)); } else { loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE); } RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation); return doInvoke(invocation, invokers, loadbalance); }
AbstractClusterInvoker.doselect
呼叫LoadBalance.select方法,講invokers按照指定演算法進行負載
private Invoker<T> doselect(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException { if (invokers == null || invokers.size() == 0) return null; if (invokers.size() == 1) return invokers.get(0); // 如果只有兩個invoker,退化成輪循 if (invokers.size() == 2 && selected != null && selected.size() > 0) { return selected.get(0) == invokers.get(0) ? invokers.get(1) : invokers.get(0); } Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation); //如果 selected中包含(優先判斷) 或者 不可用&&availablecheck=true 則重試. if( (selected != null && selected.contains(invoker)) ||(!invoker.isAvailable() && getUrl()!=null && availablecheck)){ try{ Invoker<T> rinvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck); if(rinvoker != null){ invoker = rinvoker; }else{ //看下第一次選的位置,如果不是最後,選+1位置. int index = invokers.indexOf(invoker); try{ //最後在避免碰撞 invoker = index <invokers.size()-1?invokers.get(index+1) :invoker; }catch (Exception e) { logger.warn(e.getMessage()+" may because invokers list dynamic change, ignore.",e); } } }catch (Throwable t){ logger.error("clustor relselect fail reason is :"+t.getMessage() +" if can not slove ,you can set cluster.availablecheck=false in url",t); } } return invoker; }
通過呼叫 AbstrctLoadBalance 的loadbalance.select(invokers, getUrl(), invocation) 轉向具體的實現類,這裡就是隨機演算法負載的 doSelect:
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { int length = invokers.size(); // 總個數 int totalWeight = 0; // 總權重 boolean sameWeight = true; // 權重是否都一樣 for (int i = 0; i < length; i++) { int weight = getWeight(invokers.get(i), invocation); totalWeight += weight; // 累計總權重 if (sameWeight && i > 0 && weight != getWeight(invokers.get(i - 1), invocation)) { sameWeight = false; // 計算所有權重是否一樣 } } if (totalWeight > 0 && ! sameWeight) { // 如果權重不相同且權重大於0則按總權重數隨機 int offset = random.nextInt(totalWeight); // 並確定隨機值落在哪個片斷上 for (int i = 0; i < length; i++) { offset -= getWeight(invokers.get(i), invocation); if (offset < 0) { return invokers.get(i); } } } // 如果權重相同或權重為0則均等隨機 return invokers.get(random.nextInt(length)); }
配置權重可以在配置檔案中再service中可以配置weight 來確定隨機的傾向
Random LoadBalance
- 隨機,按權重設定隨機概率。
- 在一個截面上碰撞的概率高,但呼叫量越大分佈越均勻,而且按概率使用權重後也比較均勻,有利於動態調整提供者權重。
RoundRobin LoadBalance
- 輪循,按公約後的權重設定輪循比率。
- 存在慢的提供者累積請求問題,比如:第二臺機器很慢,但沒掛,當請求調到第二臺時就卡在那,久而久之,所有請求都卡在調到第二臺上。
LeastActive LoadBalance
- 最少活躍呼叫數,相同活躍數的隨機,活躍數指呼叫前後計數差。
- 使慢的提供者收到更少請求,因為越慢的提供者的呼叫前後計數差會越大。
ConsistentHash LoadBalance
- 一致性Hash,相同引數的請求總是發到同一提供者。
- 當某一臺提供者掛時,原本發往該提供者的請求,基於虛擬節點,平攤到其它提供者,不會引起劇烈變動。
- 預設只對第一個引數Hash,如果要修改,請配置<dubbo:parameter key="hash.arguments" value="0,1" />
- 預設用160份虛擬節點,如果要修改,請配置<dubbo:parameter key="hash.nodes" value="320" />