1. 程式人生 > 程式設計 >Dubbo原始碼解析(三十九)叢集——merger

Dubbo原始碼解析(三十九)叢集——merger

叢集——merger

目標:介紹dubbo中叢集的分組聚合,介紹dubbo-cluster下merger包的原始碼。

前言

按組合並返回結果 ,比如選單服務,介面一樣,但有多種實現,用group區分,現在消費方需從每種group中呼叫一次返回結果,合併結果返回,這樣就可以實現聚合選單項。這個時候就要用到分組聚合。

原始碼分析

(一)MergeableCluster

public class MergeableCluster implements Cluster {

    public static final String NAME = "mergeable";

    @Override
public <T> Invoker<T> join(Directory<T> directory) throws RpcException { // 建立MergeableClusterInvoker return new MergeableClusterInvoker<T>(directory); } } 複製程式碼

該類實現了Cluster介面,是分組集合的叢集實現。

(二)MergeableClusterInvoker

該類是分組聚合的實現類,其中最關機的就是invoke方法。

@Override
@SuppressWarnings("rawtypes") public Result invoke(final Invocation invocation) throws RpcException { // 獲得invoker集合 List<Invoker<T>> invokers = directory.list(invocation); /** * 獲得是否merger */ String merger = getUrl().getMethodParameter(invocation.getMethodName(),Constants.MERGER_KEY); // 如果沒有設定需要聚合,則只呼叫一個invoker的
if (ConfigUtils.isEmpty(merger)) { // If a method doesn't have a merger,only invoke one Group // 只要有一個可用就返回 for (final Invoker<T> invoker : invokers) { if (invoker.isAvailable()) { return invoker.invoke(invocation); } } return invokers.iterator().next().invoke(invocation); } // 返回型別 Class<?> returnType; try { // 獲得返回型別 returnType = getInterface().getMethod( invocation.getMethodName(),invocation.getParameterTypes()).getReturnType(); } catch (NoSuchMethodException e) { returnType = null; } // 結果集合 Map<String,Future<Result>> results = new HashMap<String,Future<Result>>(); // 迴圈invokers for (final Invoker<T> invoker : invokers) { // 獲得每次呼叫的future Future<Result> future = executor.submit(new Callable<Result>() { @Override public Result call() throws Exception { // 回撥,把返回結果放入future return invoker.invoke(new RpcInvocation(invocation,invoker)); } }); // 加入集合 results.put(invoker.getUrl().getServiceKey(),future); } Object result = null; List<Result> resultList = new ArrayList<Result>(results.size()); // 獲得超時時間 int timeout = getUrl().getMethodParameter(invocation.getMethodName(),Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT); // 遍歷每一個結果 for (Map.Entry<String,Future<Result>> entry : results.entrySet()) { Future<Result> future = entry.getValue(); try { // 獲得呼叫返回的結果 Result r = future.get(timeout,TimeUnit.MILLISECONDS); if (r.hasException()) { log.error("Invoke " + getGroupDescFromServiceKey(entry.getKey()) + " failed: " + r.getException().getMessage(),r.getException()); } else { // 加入集合 resultList.add(r); } } catch (Exception e) { throw new RpcException("Failed to invoke service " + entry.getKey() + ": " + e.getMessage(),e); } } // 如果為空,則返回空的結果 if (resultList.isEmpty()) { return new RpcResult((Object) null); } else if (resultList.size() == 1) { // 如果只有一個結果,則返回該結果 return resultList.iterator().next(); } // 如果返回型別是void,也就是沒有返回值,那麼返回空結果 if (returnType == void.class) { return new RpcResult((Object) null); } // 根據方法來合併,將呼叫返回結果的指定方法進行合併 if (merger.startsWith(".")) { merger = merger.substring(1); Method method; try { // 獲得方法 method = returnType.getMethod(merger,returnType); } catch (NoSuchMethodException e) { throw new RpcException("Can not merge result because missing method [ " + merger + " ] in class [ " + returnType.getClass().getName() + " ]"); } // 有 Method ,進行合併 if (!Modifier.isPublic(method.getModifiers())) { method.setAccessible(true); } // 從集合中移除 result = resultList.remove(0).getValue(); try { // 方法返回型別匹配,合併時,修改 result if (method.getReturnType() != void.class && method.getReturnType().isAssignableFrom(result.getClass())) { for (Result r : resultList) { result = method.invoke(result,r.getValue()); } } else { // 方法返回型別不匹配,合併時,不修改 result for (Result r : resultList) { method.invoke(result,r.getValue()); } } } catch (Exception e) { throw new RpcException("Can not merge result: " + e.getMessage(),e); } } else { // 基於 Merger Merger resultMerger; // 如果是預設的方式 if (ConfigUtils.isDefault(merger)) { // 獲得該型別的合併方式 resultMerger = MergerFactory.getMerger(returnType); } else { // 如果不是預設的,則配置中指定獲得Merger的實現類 resultMerger = ExtensionLoader.getExtensionLoader(Merger.class).getExtension(merger); } if (resultMerger != null) { List<Object> rets = new ArrayList<Object>(resultList.size()); // 遍歷返回結果 for (Result r : resultList) { // 加入到rets rets.add(r.getValue()); } // 合併 result = resultMerger.merge( rets.toArray((Object[]) Array.newInstance(returnType,0))); } else { throw new RpcException("There is no merger to merge result."); } } // 返回結果 return new RpcResult(result); } 複製程式碼

前面部分在講獲得呼叫的結果,後面部分是對結果的合併,合併有兩種方式,根據配置不同可用分為基於方法的合併和基於merger的合併。

(三)MergerFactory

Merger 工廠類,獲得指定型別的Merger 物件。

public class MergerFactory {

    /**
     * Merger 物件快取
     */
    private static final ConcurrentMap<Class<?>,Merger<?>> mergerCache =
            new ConcurrentHashMap<Class<?>,Merger<?>>();

    /**
     * 獲得指定型別的Merger物件
     * @param returnType
     * @param <T>
     * @return
     */
    public static <T> Merger<T> getMerger(Class<T> returnType) {
        Merger result;
        // 如果型別是集合
        if (returnType.isArray()) {
            // 獲得型別
            Class type = returnType.getComponentType();
            // 從快取中獲得該型別的Merger物件
            result = mergerCache.get(type);
            // 如果為空,則
            if (result == null) {
                // 初始化所有的 Merger 擴充套件物件,到 mergerCache 快取中。
                loadMergers();
                // 從集合中取出對應的Merger物件
                result = mergerCache.get(type);
            }
            // 如果結果為空,則直接返回ArrayMerger的單例
            if (result == null && !type.isPrimitive()) {
                result = ArrayMerger.INSTANCE;
            }
        } else {
            // 否則直接從mergerCache中取出
            result = mergerCache.get(returnType);
            // 如果為空
            if (result == null) {
                // 初始化所有的 Merger 擴充套件物件,到 mergerCache 快取中。
                loadMergers();
                // 從集合中取出
                result = mergerCache.get(returnType);
            }
        }
        return result;
    }

    /**
     * 初始化所有的 Merger 擴充套件物件,到 mergerCache 快取中。
     */
    static void loadMergers() {
        // 獲得Merger所有的擴充套件物件名
        Set<String> names = ExtensionLoader.getExtensionLoader(Merger.class)
                .getSupportedExtensions();
        // 遍歷
        for (String name : names) {
            // 載入每一個擴充套件實現,然後放入快取。
            Merger m = ExtensionLoader.getExtensionLoader(Merger.class).getExtension(name);
            mergerCache.putIfAbsent(ReflectUtils.getGenericClass(m.getClass()),m);
        }
    }

}
複製程式碼

邏輯比較簡單。

(四)ArrayMerger

因為不同的型別有不同的Merger實現,我們可以來看看這個圖片:

merger

可以看到有好多好多,我就講解其中的一種,偷懶一下,其他的麻煩有興趣的去看看原始碼了。

public class ArrayMerger implements Merger<Object[]> {

    /**
     * 單例
     */
    public static final ArrayMerger INSTANCE = new ArrayMerger();

    @Override
    public Object[] merge(Object[]... others) {
        // 如果長度為0  則直接返回
        if (others.length == 0) {
            return null;
        }
        // 總長
        int totalLen = 0;
        // 遍歷所有需要合併的物件
        for (int i = 0; i < others.length; i++) {
            Object item = others[i];
            // 如果為陣列
            if (item != null && item.getClass().isArray()) {
                // 累加陣列長度
                totalLen += Array.getLength(item);
            } else {
                throw new IllegalArgumentException((i + 1) + "th argument is not an array");
            }
        }

        if (totalLen == 0) {
            return null;
        }

        // 獲得陣列型別
        Class<?> type = others[0].getClass().getComponentType();

        // 建立長度
        Object result = Array.newInstance(type,totalLen);
        int index = 0;
        // 遍歷需要合併的物件
        for (Object array : others) {
            // 遍歷每個陣列中的資料
            for (int i = 0; i < Array.getLength(array); i++) {
                // 加入到最終結果中
                Array.set(result,index++,Array.get(array,i));
            }
        }
        return (Object[]) result;
    }

}
複製程式碼

是不是很簡單,就是迴圈合併就可以了。

後記

該部分相關的原始碼解析地址:github.com/CrazyHZM/in…

該文章講解了叢集中關於分組聚合實現的部分。接下來我將開始對叢集模組關於路由部分進行講解。