Soul閘道器原始碼分析-6期
文章目錄
今日任務
- AlibabaDubboPlugin 研究
- ApacheDubboPlugin 研究
AlibabaDubboPlugin
將該啟動的都開啟, Mysql、Zookeeper、AlibabaDubbo服務、Soul-admin、Soul-bootstrap.
值得注意的是, 啟動閘道器時列印了這幾行日誌:
2021-01-19 19:56:44.680 INFO 1623 --- [ctReadThread-37] o.d.s.p.a.d.c.ApplicationConfigCache : init aliaba dubbo reference success there meteData is :MetaData(id=1350474912353136640, appName=dubbo, contextPath=null, path=/dubbo/insert, rpcType=dubbo, serviceName=org.dromara.soul.test.dubbo.api.service.DubboTestService, methodName=insert, parameterTypes= org.dromara.soul.test.dubbo.api.entity.DubboTest, rpcExt={"timeout":10000}, enabled=true)
2021-01-19 19:56:44.829 INFO 1623 --- [ctReadThread-37] o.d.s.p.a.d.c.ApplicationConfigCache : init aliaba dubbo reference success there meteData is :MetaData(id=1350474914580312064, appName=dubbo, contextPath= null, path=/dubbo/findByIdsAndName, rpcType=dubbo, serviceName=org.dromara.soul.test.dubbo.api.service.DubboMultiParamService, methodName=findByIdsAndName, parameterTypes=java.util.List,java.lang.String, rpcExt={"timeout":10000}, enabled=true)
看著像服務資訊快取相關, 先記下 `ApplicationConfigCache` 這個類.
這次我們有了以前的經驗, 直接找到 SoulWebHandler
的 execute()
方法, 看看呼叫的外掛鏈長啥樣:
plugins = {[email protected]} size = 12
0 = {[email protected]}
1 = {[email protected]}
2 = {[email protected]}
3 = {[email protected]}
4 = {[email protected]}
5 = {[email protected]}
6 = {[email protected]}
7 = {[email protected]}
8 = {[email protected]}
9 = {[email protected]}
10 = {[email protected]}
11 = {[email protected]}
比起簡單的HTTP呼叫, 多了兩個外掛 BodyParamPlugin
、AlibabaDubboPlugin
、DubboResponsePlugin
, 這兩個應該就是 Dubbo 服務相關的外掛了.
繼續跑跑鏈呼叫, 像之前的老熟人 DividePlugin
、WebClientPlugin
、WebClientResponsePlugin
和 WebSocketPlugin
都直接跳過了, 能理解, 畢竟走的 RpcType 型別為 dubbo.
最後再關注下執行緒變動資訊, 因為有服務呼叫肯定應該是要非同步的, 這裡不是在呼叫服務就是準備呼叫, 總之它不老實就是了:
可以從以下截圖看到, 外掛呼叫在 BodyParamPlugin
到 AlibabaDubboPlugin
間執行緒發生變動, 看來 BodyParamPlugin
是關鍵的請求轉發外掛, 我們從它來開刀, 跟蹤它的程式碼.
PS: 這裡注意下, 此處 BodyParamPlugin
完整路徑為: org.dromara.soul.plugin.alibaba.dubbo.param
public class BodyParamPlugin implements SoulPlugin {
@Override
public Mono<Void> execute(final ServerWebExchange exchange, final SoulPluginChain chain) {
final ServerHttpRequest request = exchange.getRequest();
final SoulContext soulContext = exchange.getAttribute(Constants.CONTEXT);
if (Objects.nonNull(soulContext) && RpcTypeEnum.DUBBO.getName().equals(soulContext.getRpcType())) {
// 獲取請求頭的 contentType
MediaType mediaType = request.getHeaders().getContentType();
ServerRequest serverRequest = ServerRequest.create(exchange, messageReaders);
return serverRequest.bodyToMono(String.class)
.switchIfEmpty(Mono.defer(() ->
// 切換執行緒了
Mono.just(""))
)
.flatMap(body -> {
// 判斷 contentType 是否 json, 是則將 body 資訊注入上下文, 用 key 標識是 dubbo引數
if (MediaType.APPLICATION_JSON.isCompatibleWith(mediaType)) {
exchange.getAttributes().put(Constants.DUBBO_PARAMS, body);
}
// 繼續鏈呼叫
return chain.execute(exchange);
});
}
return chain.execute(exchange);
}
}
沒看到關鍵的呼叫服務的資訊, 繼續看下個外掛 AlibabaPlugin
(僅保留關鍵程式碼).
public class AlibabaDubboPlugin extends AbstractSoulPlugin {
@Override
protected Mono<Void> doExecute(final ServerWebExchange exchange, final SoulPluginChain chain, final SelectorData selector, final RuleData rule) {
// 獲取 BodyParamPlugin 注入的 body 引數
String body = exchange.getAttribute(Constants.DUBBO_PARAMS);
// 獲取元資料資訊(路徑、rpcType等)
MetaData metaData = exchange.getAttribute(Constants.META_DATA);
// Dubbo服務呼叫
Object result = alibabaDubboProxyService.genericInvoker(body, metaData);
// ..
exchange.getAttributes().put(Constants.DUBBO_RPC_RESULT, result);
return chain.execute(exchange);
}
}
看來 alibabaDubboProxyService.genericInvoker()
這句就是呼叫關鍵了, 追蹤被呼叫的 AlibabaDubboProxyService
(僅保留核心程式碼):
public class AlibabaDubboProxyService {
public Object genericInvoker(final String body, final MetaData metaData) throws SoulException {
// 這裡是關鍵, 通過服務名在快取中獲得服務資訊
ReferenceConfig<GenericService> reference = ApplicationConfigCache.getInstance().get(metaData.getServiceName());
GenericService genericService = reference.get();
// ...
return genericService.$invoke(metaData.getMethodName(), new String[]{}, new Object[]{});
}
}
方法的第一句就是關鍵, debug看看是什麼資訊:
<dubbo:reference protocol="dubbo" interface="org.dromara.soul.test.dubbo.api.service.DubboTestService" uniqueServiceName="org.dromara.soul.test.dubbo.api.service.DubboTestService" generic="true" generic="true" timeout="10000" id="org.dromara.soul.test.dubbo.api.service.DubboTestService" />
這裡直接從閘道器快取裡, 獲得了dubbo服務的介面相關配置, 那這個快取的資料又是怎麼來的呢? 還記得啟動閘道器時, 列印的日誌資訊麼, 程式碼裡也能看到 ApplicationConfigCache
的存在. 我們通過那個日誌, 追溯下這個類的具體生成快取的方法:
public final class ApplicationConfigCache {
public ReferenceConfig<GenericService> build(final MetaData metaData) {
ReferenceConfig<GenericService> reference = new ReferenceConfig<>();
reference.setGeneric(true);
reference.setApplication(applicationConfig);
reference.setRegistry(registryConfig);
reference.setInterface(metaData.getServiceName());
reference.setProtocol("dubbo");
// ...
Object obj = reference.get();
if (obj != null) {
// 通過這行日誌定位方法
log.info("init aliaba dubbo reference success there meteData is :{}", metaData.toString());
// 服務名稱與資訊放入快取
cache.put(metaData.getServiceName(), reference);
}
return reference;
}
}
這裡是載入快取的地方, 那麼這個方法是怎麼被呼叫的? 追溯到 AlibabaDubboMetaDataSubscriber
:
public class AlibabaDubboMetaDataSubscriber implements MetaDataSubscriber {
private static final ConcurrentMap<String, MetaData> META_DATA = Maps.newConcurrentMap();
@Override
public void onSubscribe(final MetaData metaData) {
if (RpcTypeEnum.DUBBO.getName().equals(metaData.getRpcType())) {
// ...
// 這個方法最終會調到 build()
ApplicationConfigCache.getInstance().initRef(metaData);
META_DATA.put(metaData.getPath(), metaData);
}
}
}
其實看到它的介面類 MetaDataSubscriber
就能知道個大概了, 這又是個實現了元資料更新訂閱的類, 會接收 soul-admin
管理後臺推送的元資料資訊, 篩選出 RpcType 為 Dubbo 型別的元資料進行快取更新.
@RequiredArgsConstructor
public class MetaDataHandler extends AbstractDataHandler<MetaData> {
private final List<MetaDataSubscriber> metaDataSubscribers;
@Override
protected void doRefresh(final List<MetaData> dataList) {
metaDataSubscribers.forEach(MetaDataSubscriber::refresh);
// 遍歷所有元資料更新訂閱類
dataList.forEach(metaData -> metaDataSubscribers.forEach(metaDataSubscriber -> metaDataSubscriber.onSubscribe(metaData)));
}
// ...
}
最終的呼叫 dubbo 服務的程式碼這裡再貼下, 都是 Alibaba Dubbo 框架的內容了, 不深入分析:
public class AlibabaDubboProxyService {
public Object genericInvoker(final String body, final MetaData metaData) throws SoulException {
// ...
GenericService genericService = reference.get();
if (null == body || "".equals(body) || "{}".equals(body) || "null".equals(body)) {
return genericService.$invoke(metaData.getMethodName(), new String[]{}, new Object[]{});
} else {
Pair<String[], Object[]> pair = dubboParamResolveService.buildParameter(body, metaData.getParameterTypes());
return genericService.$invoke(metaData.getMethodName(), pair.getLeft(), pair.getRight());
}
}
}
ApacheDubboPlugin
開始分析 ApacheDubboPlugin 前, 先確認下 soul-bootstrap
閘道器的 pom.xml
檔案, apache-dubbo相關的依賴是否啟用, 尤其注意看 soul-spring-boot-starter-plugin-apache-dubbo
是否啟用.
啟動三個專案 soul-admin
、soul-bootstrap
、soul-test-apache-dubbo-service
.
啟動時報錯 Duplicate key
打臉的是, 閘道器立馬報了個錯:
Caused by: java.lang.IllegalStateException: Duplicate key org.dromar[email protected]4e83a98
...
at org.dromara.soul.plugin.base.cache.CommonPluginDataSubscriber.<init>(CommonPluginDataSubscriber.java:46) ~[classes/:na]
at org.dromara.soul.web.configuration.SoulConfiguration.pluginDataSubscriber(SoulConfiguration.java:97) ~[classes/:na]
不要緊張這些情況都是小case, 看到 CommonPluginDataSubscriber
這個類且看過我3期的分析文章 的同學, 應該會有種熟悉感… 不怕, 從頭梳理下流程你就懂了.
首先 SoulConfiguration
配置類會載入一個叫做 pluginDataSubscriber
的 Bean, 用作外掛資料的訂閱, 當然訂閱物件是後臺管理系統:
public class SoulConfiguration {
@Bean
public PluginDataSubscriber pluginDataSubscriber(final ObjectProvider<List<PluginDataHandler>> pluginDataHandlerList) {
return new CommonPluginDataSubscriber(pluginDataHandlerList.getIfAvailable(Collections::emptyList));
}
}
這裡的入參由來, 會藉助 spring4.X 的機制獲得所有父類為 PluginDataHandler
的實現類, 自然也會找到 CommonPluginDataSubscriber
:
public class CommonPluginDataSubscriber implements PluginDataSubscriber {
// ...
}
CommonPluginDataSubscriber
的構造器開始工作, 接收 pluginDataHandlerList
入參, 將這些 Bean 物件轉換成一個快取map:
private final Map<String, PluginDataHandler> handlerMap;
public CommonPluginDataSubscriber(final List<PluginDataHandler> pluginDataHandlerList) {
// 異常問題就是此行
this.handlerMap = pluginDataHandlerList.stream().collect(Collectors.toConcurrentMap(PluginDataHandler::pluginNamed, e -> e));
}
我們的異常就是在這裡了, 構建hash時key重複了, 說明 pluginDataHandlerList
這個入參的 pluginName
屬性有重複. 那麼這個入參怎麼來的呢? 自然是 spring注入的, 找到這些 Bean 即可.
答案就在我們閘道器中引入的 soul-spring-boot-starter-plugin-xx
專案中, 我現在不僅引入了 alibaba-dubbo
, 也引入了 apache-dubbo
, 這兩個 PluginDataHandler
實現子類衝突了:
public class AlibabaDubboPluginDataHandler implements PluginDataHandler {
@Override
public String pluginNamed() {
return PluginEnum.DUBBO.getName();
}
}
public class ApacheDubboPluginDataHandler implements PluginDataHandler {
@Override
public String pluginNamed() {
return PluginEnum.DUBBO.getName();
}
}
怎麼解決呢? 在閘道器中遮蔽調 soul-spring-boot-starter-plugin-alibaba-dubbo
這個依賴即可, 它的 AlibabaDubboPluginConfiguration
配置工廠不工作了自然不會有 AlibabaDubboPluginDataHandler
這個Bean了.
PS: 我挺想改了其中一個 pluginNamed()
返回值, 但鬼知道哪裡會有這個列舉的使用, 不作死了.
正題
照舊找到 SoulWebHandler
的 execute()
方法, 看看呼叫的外掛鏈長啥樣:
plugins = {[email protected]} size = 11
0 = {[email protected]}
1 = {[email protected]}
2 = {[email protected]}
3 = {[email protected]}
4 = {[email protected]}
5 = {[email protected]}
6 = {[email protected]}
7 = {[email protected]}
8 = {[email protected]}
9 = {[email protected]}
10 = {[email protected]}
注意: 這裡的 BodyParamPlugin
完整路徑 org.dromara.soul.plugin.apache.dubbo.param
除了把 BodyParamPlugin
、AlibabaDubboPlugin
換成 Apache包裡的 BodyParamPlugin
、ApacheDubboPlugin
, 就沒啥區別了. 再具體看看 BodyParamPlugin
有什麼不同:
public class BodyParamPlugin implements SoulPlugin {
// ..
@Override
public String named() {
return "apache-dubbo-body-param";
}
}
除了 named()
方法返回的字串不一樣, 沒看出其他區別. 各種變數的類路徑也都一樣. 看看 ApacheDubboPlugin … 終於看出不一樣了, 主要是呼叫的樁實現上不同 (僅保留核心程式碼):
public class ApacheDubboProxyService {
public Mono<Object> genericInvoker(final String body, final MetaData metaData, final ServerWebExchange exchange) throws SoulException {
// ...
GenericService genericService = reference.get();
Pair<String[], Object[]> pair = new ImmutablePair<>(new String[]{}, new Object[]{});
// 使用 apache dubbo 的非同步回撥
CompletableFuture<Object> future = genericService.$invokeAsync(metaData.getMethodName(), pair.getLeft(), pair.getRight());
return Mono.fromFuture(future.thenApply(ret -> {
if (Objects.nonNull(ret)) {
exchange.getAttributes().put(Constants.DUBBO_RPC_RESULT, ret);
} else {
exchange.getAttributes().put(Constants.DUBBO_RPC_RESULT, Constants.DUBBO_RPC_RESULT_EMPTY);
}
exchange.getAttributes().put(Constants.CLIENT_RESPONSE_RESULT_TYPE, ResultEnum.SUCCESS.getName());
return ret;
}));
}
}
ApacheDubboPlugin 對服務的呼叫是非同步回撥的模型 $invokeAsync()
, 不用阻塞執行緒等待結果. 而 AlibabaDubboPlugin 在這塊的呼叫模型則是同步的 $invoke()
.
這塊我其實挺有疑惑的, 沒研究過 Apache-dubbo 和 Alibaba-dubbo, 不太清楚為什麼一個支援$invokeAsync()
而另一個僅支援 $invoke()
. 今後會了解一二再出個番外篇.