1. 程式人生 > 其它 >Soul閘道器原始碼分析-6期

Soul閘道器原始碼分析-6期

技術標籤:閘道器java

文章目錄


今日任務

  • AlibabaDubboPlugin 研究
  • ApacheDubboPlugin 研究



AlibabaDubboPlugin


將該啟動的都開啟, Mysql、Zookeeper、AlibabaDubbo服務、Soul-admin、Soul-bootstrap.

值得注意的是, 啟動閘道器時列印了這幾行日誌:

2021-01-19 19:56:44.680  INFO 1623 --- [ctReadThread-37]
o.d.s.p.a.d.c.ApplicationConfigCache : init aliaba dubbo reference success there meteData is :MetaData(id=1350474912353136640, appName=dubbo, contextPath=null, path=/dubbo/insert, rpcType=dubbo, serviceName=org.dromara.soul.test.dubbo.api.service.DubboTestService, methodName=insert, parameterTypes=
org.dromara.soul.test.dubbo.api.entity.DubboTest, rpcExt={"timeout":10000}, enabled=true) 2021-01-19 19:56:44.829 INFO 1623 --- [ctReadThread-37] o.d.s.p.a.d.c.ApplicationConfigCache : init aliaba dubbo reference success there meteData is :MetaData(id=1350474914580312064, appName=dubbo, contextPath=
null, path=/dubbo/findByIdsAndName, rpcType=dubbo, serviceName=org.dromara.soul.test.dubbo.api.service.DubboMultiParamService, methodName=findByIdsAndName, parameterTypes=java.util.List,java.lang.String, rpcExt={"timeout":10000}, enabled=true)

看著像服務資訊快取相關, 先記下 `ApplicationConfigCache` 這個類.

這次我們有了以前的經驗, 直接找到 SoulWebHandlerexecute() 方法, 看看呼叫的外掛鏈長啥樣:

plugins = {[email protected]}  size = 12
 0 = {[email protected]} 
 1 = {[email protected]} 
 2 = {[email protected]} 
 3 = {[email protected]} 
 4 = {[email protected]} 
 5 = {[email protected]} 
 6 = {[email protected]} 
 7 = {[email protected]} 
 8 = {[email protected]} 
 9 = {[email protected]} 
 10 = {[email protected]} 
 11 = {[email protected]} 

比起簡單的HTTP呼叫, 多了兩個外掛 BodyParamPluginAlibabaDubboPluginDubboResponsePlugin, 這兩個應該就是 Dubbo 服務相關的外掛了.

繼續跑跑鏈呼叫, 像之前的老熟人 DividePluginWebClientPluginWebClientResponsePluginWebSocketPlugin 都直接跳過了, 能理解, 畢竟走的 RpcType 型別為 dubbo.


最後再關注下執行緒變動資訊, 因為有服務呼叫肯定應該是要非同步的, 這裡不是在呼叫服務就是準備呼叫, 總之它不老實就是了:

在這裡插入圖片描述

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片儲存下來直接上傳(img-CuZozXme-1611073192284)(images/image-20210119200644944.png)]

可以從以下截圖看到, 外掛呼叫在 BodyParamPluginAlibabaDubboPlugin 間執行緒發生變動, 看來 BodyParamPlugin 是關鍵的請求轉發外掛, 我們從它來開刀, 跟蹤它的程式碼.

PS: 這裡注意下, 此處 BodyParamPlugin 完整路徑為: org.dromara.soul.plugin.alibaba.dubbo.param


public class BodyParamPlugin implements SoulPlugin {
	@Override
  public Mono<Void> execute(final ServerWebExchange exchange, final SoulPluginChain chain) {
    final ServerHttpRequest request = exchange.getRequest();
    final SoulContext soulContext = exchange.getAttribute(Constants.CONTEXT);
    if (Objects.nonNull(soulContext) && RpcTypeEnum.DUBBO.getName().equals(soulContext.getRpcType())) {
      // 獲取請求頭的 contentType
      MediaType mediaType = request.getHeaders().getContentType();
      ServerRequest serverRequest = ServerRequest.create(exchange, messageReaders);
      return serverRequest.bodyToMono(String.class)
        .switchIfEmpty(Mono.defer(() ->
                                  // 切換執行緒了
                                  Mono.just(""))
                      )
        .flatMap(body -> {
          // 判斷 contentType 是否 json, 是則將 body 資訊注入上下文, 用 key 標識是 dubbo引數
          if (MediaType.APPLICATION_JSON.isCompatibleWith(mediaType)) {
            exchange.getAttributes().put(Constants.DUBBO_PARAMS, body);
          }
          // 繼續鏈呼叫
          return chain.execute(exchange);
        });
    }
    return chain.execute(exchange);
  }
}

沒看到關鍵的呼叫服務的資訊, 繼續看下個外掛 AlibabaPlugin (僅保留關鍵程式碼).

public class AlibabaDubboPlugin extends AbstractSoulPlugin {

  @Override
  protected Mono<Void> doExecute(final ServerWebExchange exchange, final SoulPluginChain chain, final SelectorData selector, final RuleData rule) {
    // 獲取 BodyParamPlugin 注入的 body 引數
    String body = exchange.getAttribute(Constants.DUBBO_PARAMS);
    // 獲取元資料資訊(路徑、rpcType等)
    MetaData metaData = exchange.getAttribute(Constants.META_DATA);
    // Dubbo服務呼叫
    Object result = alibabaDubboProxyService.genericInvoker(body, metaData);
    // ..
    exchange.getAttributes().put(Constants.DUBBO_RPC_RESULT, result);
    return chain.execute(exchange);
  }
}

看來 alibabaDubboProxyService.genericInvoker() 這句就是呼叫關鍵了, 追蹤被呼叫的 AlibabaDubboProxyService (僅保留核心程式碼):

public class AlibabaDubboProxyService {
  
  public Object genericInvoker(final String body, final MetaData metaData) throws SoulException {
    // 這裡是關鍵, 通過服務名在快取中獲得服務資訊
    ReferenceConfig<GenericService> reference = ApplicationConfigCache.getInstance().get(metaData.getServiceName());
    GenericService genericService = reference.get();
    // ...
    return genericService.$invoke(metaData.getMethodName(), new String[]{}, new Object[]{});
  }
}

方法的第一句就是關鍵, debug看看是什麼資訊:

<dubbo:reference protocol="dubbo" interface="org.dromara.soul.test.dubbo.api.service.DubboTestService" uniqueServiceName="org.dromara.soul.test.dubbo.api.service.DubboTestService" generic="true" generic="true" timeout="10000" id="org.dromara.soul.test.dubbo.api.service.DubboTestService" />

這裡直接從閘道器快取裡, 獲得了dubbo服務的介面相關配置, 那這個快取的資料又是怎麼來的呢? 還記得啟動閘道器時, 列印的日誌資訊麼, 程式碼裡也能看到 ApplicationConfigCache 的存在. 我們通過那個日誌, 追溯下這個類的具體生成快取的方法:

public final class ApplicationConfigCache {
  
	public ReferenceConfig<GenericService> build(final MetaData metaData) {
    ReferenceConfig<GenericService> reference = new ReferenceConfig<>();
    reference.setGeneric(true);
    reference.setApplication(applicationConfig);
    reference.setRegistry(registryConfig);
    reference.setInterface(metaData.getServiceName());
    reference.setProtocol("dubbo");
    // ...
    Object obj = reference.get();
    if (obj != null) {
      // 通過這行日誌定位方法
      log.info("init aliaba dubbo reference success there meteData is :{}", metaData.toString());
      // 服務名稱與資訊放入快取
      cache.put(metaData.getServiceName(), reference);
    }
    return reference;
  }
}

這裡是載入快取的地方, 那麼這個方法是怎麼被呼叫的? 追溯到 AlibabaDubboMetaDataSubscriber :

public class AlibabaDubboMetaDataSubscriber implements MetaDataSubscriber {
  private static final ConcurrentMap<String, MetaData> META_DATA = Maps.newConcurrentMap();

  @Override
  public void onSubscribe(final MetaData metaData) {
    if (RpcTypeEnum.DUBBO.getName().equals(metaData.getRpcType())) {
      // ...
      // 這個方法最終會調到 build()
      ApplicationConfigCache.getInstance().initRef(metaData);
      META_DATA.put(metaData.getPath(), metaData);
    }
  }
}

其實看到它的介面類 MetaDataSubscriber 就能知道個大概了, 這又是個實現了元資料更新訂閱的類, 會接收 soul-admin 管理後臺推送的元資料資訊, 篩選出 RpcType 為 Dubbo 型別的元資料進行快取更新.

@RequiredArgsConstructor
public class MetaDataHandler extends AbstractDataHandler<MetaData> {
    
  private final List<MetaDataSubscriber> metaDataSubscribers;

  @Override
  protected void doRefresh(final List<MetaData> dataList) {
    metaDataSubscribers.forEach(MetaDataSubscriber::refresh);
    // 遍歷所有元資料更新訂閱類
    dataList.forEach(metaData -> metaDataSubscribers.forEach(metaDataSubscriber -> metaDataSubscriber.onSubscribe(metaData)));
  }

  // ...
}

最終的呼叫 dubbo 服務的程式碼這裡再貼下, 都是 Alibaba Dubbo 框架的內容了, 不深入分析:

public class AlibabaDubboProxyService {
  
	public Object genericInvoker(final String body, final MetaData metaData) throws SoulException {
  // ...
  GenericService genericService = reference.get();
    if (null == body || "".equals(body) || "{}".equals(body) || "null".equals(body)) {
      return genericService.$invoke(metaData.getMethodName(), new String[]{}, new Object[]{});
    } else {
      Pair<String[], Object[]> pair = dubboParamResolveService.buildParameter(body, metaData.getParameterTypes());
      return genericService.$invoke(metaData.getMethodName(), pair.getLeft(), pair.getRight());
    }
	}
}



ApacheDubboPlugin


開始分析 ApacheDubboPlugin 前, 先確認下 soul-bootstrap 閘道器的 pom.xml 檔案, apache-dubbo相關的依賴是否啟用, 尤其注意看 soul-spring-boot-starter-plugin-apache-dubbo 是否啟用.

啟動三個專案 soul-adminsoul-bootstrapsoul-test-apache-dubbo-service .


啟動時報錯 Duplicate key

打臉的是, 閘道器立馬報了個錯:

Caused by: java.lang.IllegalStateException: Duplicate key org.dromar[email protected]4e83a98
	...
	at org.dromara.soul.plugin.base.cache.CommonPluginDataSubscriber.<init>(CommonPluginDataSubscriber.java:46) ~[classes/:na]
	at org.dromara.soul.web.configuration.SoulConfiguration.pluginDataSubscriber(SoulConfiguration.java:97) ~[classes/:na]

不要緊張這些情況都是小case, 看到 CommonPluginDataSubscriber 這個類且看過我3期的分析文章 的同學, 應該會有種熟悉感… 不怕, 從頭梳理下流程你就懂了.


首先 SoulConfiguration 配置類會載入一個叫做 pluginDataSubscriber 的 Bean, 用作外掛資料的訂閱, 當然訂閱物件是後臺管理系統:

public class SoulConfiguration {
	@Bean
  public PluginDataSubscriber pluginDataSubscriber(final ObjectProvider<List<PluginDataHandler>> pluginDataHandlerList) {
    return new CommonPluginDataSubscriber(pluginDataHandlerList.getIfAvailable(Collections::emptyList));
  }
}

這裡的入參由來, 會藉助 spring4.X 的機制獲得所有父類為 PluginDataHandler 的實現類, 自然也會找到 CommonPluginDataSubscriber :

public class CommonPluginDataSubscriber implements PluginDataSubscriber {
	// ...
}

CommonPluginDataSubscriber 的構造器開始工作, 接收 pluginDataHandlerList 入參, 將這些 Bean 物件轉換成一個快取map:

private final Map<String, PluginDataHandler> handlerMap;

public CommonPluginDataSubscriber(final List<PluginDataHandler> pluginDataHandlerList) {
  // 異常問題就是此行
  this.handlerMap = pluginDataHandlerList.stream().collect(Collectors.toConcurrentMap(PluginDataHandler::pluginNamed, e -> e));
}

我們的異常就是在這裡了, 構建hash時key重複了, 說明 pluginDataHandlerList 這個入參的 pluginName 屬性有重複. 那麼這個入參怎麼來的呢? 自然是 spring注入的, 找到這些 Bean 即可.


答案就在我們閘道器中引入的 soul-spring-boot-starter-plugin-xx 專案中, 我現在不僅引入了 alibaba-dubbo , 也引入了 apache-dubbo , 這兩個 PluginDataHandler 實現子類衝突了:

public class AlibabaDubboPluginDataHandler implements PluginDataHandler {

  @Override
  public String pluginNamed() {
    return PluginEnum.DUBBO.getName();
  }
}
public class ApacheDubboPluginDataHandler implements PluginDataHandler {
	@Override
  public String pluginNamed() {
    return PluginEnum.DUBBO.getName();
  }
}

怎麼解決呢? 在閘道器中遮蔽調 soul-spring-boot-starter-plugin-alibaba-dubbo 這個依賴即可, 它的 AlibabaDubboPluginConfiguration 配置工廠不工作了自然不會有 AlibabaDubboPluginDataHandler 這個Bean了.

PS: 我挺想改了其中一個 pluginNamed() 返回值, 但鬼知道哪裡會有這個列舉的使用, 不作死了.



正題

照舊找到 SoulWebHandlerexecute() 方法, 看看呼叫的外掛鏈長啥樣:

plugins = {[email protected]}  size = 11
 0 = {[email protected]} 
 1 = {[email protected]} 
 2 = {[email protected]} 
 3 = {[email protected]} 
 4 = {[email protected]} 
 5 = {[email protected]} 
 6 = {[email protected]} 
 7 = {[email protected]} 
 8 = {[email protected]} 
 9 = {[email protected]} 
 10 = {[email protected]} 

注意: 這裡的 BodyParamPlugin 完整路徑 org.dromara.soul.plugin.apache.dubbo.param


除了把 BodyParamPluginAlibabaDubboPlugin 換成 Apache包裡的 BodyParamPluginApacheDubboPlugin , 就沒啥區別了. 再具體看看 BodyParamPlugin 有什麼不同:

public class BodyParamPlugin implements SoulPlugin {

  // .. 
  @Override
  public String named() {
    return "apache-dubbo-body-param";
  }
}

除了 named() 方法返回的字串不一樣, 沒看出其他區別. 各種變數的類路徑也都一樣. 看看 ApacheDubboPlugin … 終於看出不一樣了, 主要是呼叫的樁實現上不同 (僅保留核心程式碼):

public class ApacheDubboProxyService {

  public Mono<Object> genericInvoker(final String body, final MetaData metaData, final ServerWebExchange exchange) throws SoulException {
    // ...
    GenericService genericService = reference.get();
    Pair<String[], Object[]> pair = new ImmutablePair<>(new String[]{}, new Object[]{});
    // 使用 apache dubbo 的非同步回撥
    CompletableFuture<Object> future = genericService.$invokeAsync(metaData.getMethodName(), pair.getLeft(), pair.getRight());
    return Mono.fromFuture(future.thenApply(ret -> {
      if (Objects.nonNull(ret)) {
        exchange.getAttributes().put(Constants.DUBBO_RPC_RESULT, ret);
      } else {
        exchange.getAttributes().put(Constants.DUBBO_RPC_RESULT, Constants.DUBBO_RPC_RESULT_EMPTY);
      }
      exchange.getAttributes().put(Constants.CLIENT_RESPONSE_RESULT_TYPE, ResultEnum.SUCCESS.getName());
      return ret;
    }));
  }
}

ApacheDubboPlugin 對服務的呼叫是非同步回撥的模型 $invokeAsync(), 不用阻塞執行緒等待結果. 而 AlibabaDubboPlugin 在這塊的呼叫模型則是同步的 $invoke() .

這塊我其實挺有疑惑的, 沒研究過 Apache-dubbo 和 Alibaba-dubbo, 不太清楚為什麼一個支援$invokeAsync() 而另一個僅支援 $invoke() . 今後會了解一二再出個番外篇.