Dubbo原始碼解析(三十九)叢集——merger
阿新 • • 發佈:2020-06-24
叢集——merger
目標:介紹dubbo中叢集的分組聚合,介紹dubbo-cluster下merger包的原始碼。
前言
按組合並返回結果 ,比如選單服務,介面一樣,但有多種實現,用group區分,現在消費方需從每種group中呼叫一次返回結果,合併結果返回,這樣就可以實現聚合選單項。這個時候就要用到分組聚合。
原始碼分析
(一)MergeableCluster
public class MergeableCluster implements Cluster {
public static final String NAME = "mergeable";
@Override
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
// 建立MergeableClusterInvoker
return new MergeableClusterInvoker<T>(directory);
}
}
複製程式碼
該類實現了Cluster介面,是分組集合的叢集實現。
(二)MergeableClusterInvoker
該類是分組聚合的實現類,其中最關機的就是invoke方法。
@Override
@SuppressWarnings("rawtypes")
public Result invoke(final Invocation invocation) throws RpcException {
// 獲得invoker集合
List<Invoker<T>> invokers = directory.list(invocation);
/**
* 獲得是否merger
*/
String merger = getUrl().getMethodParameter(invocation.getMethodName(),Constants.MERGER_KEY);
// 如果沒有設定需要聚合,則只呼叫一個invoker的
if (ConfigUtils.isEmpty(merger)) { // If a method doesn't have a merger,only invoke one Group
// 只要有一個可用就返回
for (final Invoker<T> invoker : invokers) {
if (invoker.isAvailable()) {
return invoker.invoke(invocation);
}
}
return invokers.iterator().next().invoke(invocation);
}
// 返回型別
Class<?> returnType;
try {
// 獲得返回型別
returnType = getInterface().getMethod(
invocation.getMethodName(),invocation.getParameterTypes()).getReturnType();
} catch (NoSuchMethodException e) {
returnType = null;
}
// 結果集合
Map<String,Future<Result>> results = new HashMap<String,Future<Result>>();
// 迴圈invokers
for (final Invoker<T> invoker : invokers) {
// 獲得每次呼叫的future
Future<Result> future = executor.submit(new Callable<Result>() {
@Override
public Result call() throws Exception {
// 回撥,把返回結果放入future
return invoker.invoke(new RpcInvocation(invocation,invoker));
}
});
// 加入集合
results.put(invoker.getUrl().getServiceKey(),future);
}
Object result = null;
List<Result> resultList = new ArrayList<Result>(results.size());
// 獲得超時時間
int timeout = getUrl().getMethodParameter(invocation.getMethodName(),Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT);
// 遍歷每一個結果
for (Map.Entry<String,Future<Result>> entry : results.entrySet()) {
Future<Result> future = entry.getValue();
try {
// 獲得呼叫返回的結果
Result r = future.get(timeout,TimeUnit.MILLISECONDS);
if (r.hasException()) {
log.error("Invoke " + getGroupDescFromServiceKey(entry.getKey()) +
" failed: " + r.getException().getMessage(),r.getException());
} else {
// 加入集合
resultList.add(r);
}
} catch (Exception e) {
throw new RpcException("Failed to invoke service " + entry.getKey() + ": " + e.getMessage(),e);
}
}
// 如果為空,則返回空的結果
if (resultList.isEmpty()) {
return new RpcResult((Object) null);
} else if (resultList.size() == 1) {
// 如果只有一個結果,則返回該結果
return resultList.iterator().next();
}
// 如果返回型別是void,也就是沒有返回值,那麼返回空結果
if (returnType == void.class) {
return new RpcResult((Object) null);
}
// 根據方法來合併,將呼叫返回結果的指定方法進行合併
if (merger.startsWith(".")) {
merger = merger.substring(1);
Method method;
try {
// 獲得方法
method = returnType.getMethod(merger,returnType);
} catch (NoSuchMethodException e) {
throw new RpcException("Can not merge result because missing method [ " + merger + " ] in class [ " +
returnType.getClass().getName() + " ]");
}
// 有 Method ,進行合併
if (!Modifier.isPublic(method.getModifiers())) {
method.setAccessible(true);
}
// 從集合中移除
result = resultList.remove(0).getValue();
try {
// 方法返回型別匹配,合併時,修改 result
if (method.getReturnType() != void.class
&& method.getReturnType().isAssignableFrom(result.getClass())) {
for (Result r : resultList) {
result = method.invoke(result,r.getValue());
}
} else {
// 方法返回型別不匹配,合併時,不修改 result
for (Result r : resultList) {
method.invoke(result,r.getValue());
}
}
} catch (Exception e) {
throw new RpcException("Can not merge result: " + e.getMessage(),e);
}
} else {
// 基於 Merger
Merger resultMerger;
// 如果是預設的方式
if (ConfigUtils.isDefault(merger)) {
// 獲得該型別的合併方式
resultMerger = MergerFactory.getMerger(returnType);
} else {
// 如果不是預設的,則配置中指定獲得Merger的實現類
resultMerger = ExtensionLoader.getExtensionLoader(Merger.class).getExtension(merger);
}
if (resultMerger != null) {
List<Object> rets = new ArrayList<Object>(resultList.size());
// 遍歷返回結果
for (Result r : resultList) {
// 加入到rets
rets.add(r.getValue());
}
// 合併
result = resultMerger.merge(
rets.toArray((Object[]) Array.newInstance(returnType,0)));
} else {
throw new RpcException("There is no merger to merge result.");
}
}
// 返回結果
return new RpcResult(result);
}
複製程式碼
前面部分在講獲得呼叫的結果,後面部分是對結果的合併,合併有兩種方式,根據配置不同可用分為基於方法的合併和基於merger的合併。
(三)MergerFactory
Merger 工廠類,獲得指定型別的Merger 物件。
public class MergerFactory {
/**
* Merger 物件快取
*/
private static final ConcurrentMap<Class<?>,Merger<?>> mergerCache =
new ConcurrentHashMap<Class<?>,Merger<?>>();
/**
* 獲得指定型別的Merger物件
* @param returnType
* @param <T>
* @return
*/
public static <T> Merger<T> getMerger(Class<T> returnType) {
Merger result;
// 如果型別是集合
if (returnType.isArray()) {
// 獲得型別
Class type = returnType.getComponentType();
// 從快取中獲得該型別的Merger物件
result = mergerCache.get(type);
// 如果為空,則
if (result == null) {
// 初始化所有的 Merger 擴充套件物件,到 mergerCache 快取中。
loadMergers();
// 從集合中取出對應的Merger物件
result = mergerCache.get(type);
}
// 如果結果為空,則直接返回ArrayMerger的單例
if (result == null && !type.isPrimitive()) {
result = ArrayMerger.INSTANCE;
}
} else {
// 否則直接從mergerCache中取出
result = mergerCache.get(returnType);
// 如果為空
if (result == null) {
// 初始化所有的 Merger 擴充套件物件,到 mergerCache 快取中。
loadMergers();
// 從集合中取出
result = mergerCache.get(returnType);
}
}
return result;
}
/**
* 初始化所有的 Merger 擴充套件物件,到 mergerCache 快取中。
*/
static void loadMergers() {
// 獲得Merger所有的擴充套件物件名
Set<String> names = ExtensionLoader.getExtensionLoader(Merger.class)
.getSupportedExtensions();
// 遍歷
for (String name : names) {
// 載入每一個擴充套件實現,然後放入快取。
Merger m = ExtensionLoader.getExtensionLoader(Merger.class).getExtension(name);
mergerCache.putIfAbsent(ReflectUtils.getGenericClass(m.getClass()),m);
}
}
}
複製程式碼
邏輯比較簡單。
(四)ArrayMerger
因為不同的型別有不同的Merger實現,我們可以來看看這個圖片:
可以看到有好多好多,我就講解其中的一種,偷懶一下,其他的麻煩有興趣的去看看原始碼了。
public class ArrayMerger implements Merger<Object[]> {
/**
* 單例
*/
public static final ArrayMerger INSTANCE = new ArrayMerger();
@Override
public Object[] merge(Object[]... others) {
// 如果長度為0 則直接返回
if (others.length == 0) {
return null;
}
// 總長
int totalLen = 0;
// 遍歷所有需要合併的物件
for (int i = 0; i < others.length; i++) {
Object item = others[i];
// 如果為陣列
if (item != null && item.getClass().isArray()) {
// 累加陣列長度
totalLen += Array.getLength(item);
} else {
throw new IllegalArgumentException((i + 1) + "th argument is not an array");
}
}
if (totalLen == 0) {
return null;
}
// 獲得陣列型別
Class<?> type = others[0].getClass().getComponentType();
// 建立長度
Object result = Array.newInstance(type,totalLen);
int index = 0;
// 遍歷需要合併的物件
for (Object array : others) {
// 遍歷每個陣列中的資料
for (int i = 0; i < Array.getLength(array); i++) {
// 加入到最終結果中
Array.set(result,index++,Array.get(array,i));
}
}
return (Object[]) result;
}
}
複製程式碼
是不是很簡單,就是迴圈合併就可以了。
後記
該部分相關的原始碼解析地址:github.com/CrazyHZM/in…
該文章講解了叢集中關於分組聚合實現的部分。接下來我將開始對叢集模組關於路由部分進行講解。