Motan系列-Motan的服務呼叫
Motan系列文章
0 @MotanReferer註解是個啥
被 @MotanReferer
標註的 setter 方法或 field 會被motan在啟動時掃描,併為其建立動態代理,並將動態代理的例項賦值給這個 field。遠端服務的呼叫都是在這個代理中實現的。
下面以註解在 field 的情況為例,說明 @MotanReferer
的解析以及建立動態代理的過程:
@MotanReferer(basicReferer = "ad-commonBasicRefererConfigBean" ,application = "ad-filter",version = "1.1.0")
private AdCommonRPC adCommonRPC;
複製程式碼
關於如何掃描,在 Motan如何完成與Spring的整合 一文中有詳細說明,這裡不再贅述。
1 @MotanReferer註解的解析
這個過程始於 AnnotationBean
中對掃描到的bean的field的解析。Motan會解析出帶有 @MotanReferer
的field,應呼叫 AnnotationBean 的 refer
方法初始化並建立代理。field的解析如下:
Field[] fields = clazz.getDeclaredFields();
for (Field field : fields) {
try {
if (!field.isAccessible()) {
field.setAccessible(true);
}
MotanReferer reference = field.getAnnotation(MotanReferer.class);
if (reference != null) {
// 呼叫 refer 方法初始化並建立動態代理
// 並將field的引用指向這個代理物件
Object value = refer(reference,field.getType());
if (value != null) {
field.set(bean,value);
}
}
} catch (Throwable t) {
throw new BeanInitializationException("Failed to init remote service reference at filed " + field.getName()
+ " in class " + bean.getClass().getName(),t);
}
}
複製程式碼
2 @MotanReferer的初始化
類似於服務註冊的過程,MotanReferer是通過 RefererConfigBean
類來管理配置、註冊中心、URL、HA、LoadBalance、Proxy等資源的。
還是先來個 RefererConfigBean
的UML,熟悉一下整個體系都有啥東西。
其中,註冊中心、URL、Protocol、HA、LoadBalance策略等都是在 RefererConfig
的 clusterSupports
中管理的。
來繼續看這個refer方法,這個方法中首先將 @MotanReferer
註解中的配置資訊解析到 RefererConfigBean
中,然後依然是呼叫 afterPropertiesSet()
方法做一些校驗,最後呼叫 RefererConfigBean
的 getRef()
方法,各個元件的初始化以及Proxy都在這裡建立。
private <T> Object refer(MotanReferer reference,Class<?> referenceClass) {
// 解析介面名
String interfaceName;
if (!void.class.equals(reference.interfaceClass())) {
interfaceName = reference.interfaceClass().getName();
} else if (referenceClass.isInterface()) {
interfaceName = referenceClass.getName();
} else {
throw new IllegalStateException("The @Reference undefined interfaceClass or interfaceName,and the property type "
+ referenceClass.getName() + " is not a interface.");
}
String key = reference.group() + "/" + interfaceName + ":" + reference.version();
RefererConfigBean<T> referenceConfig = referenceConfigs.get(key);
if (referenceConfig == null) {
referenceConfig = new RefererConfigBean<T>();
referenceConfig.setBeanFactory(beanFactory);
if (void.class.equals(reference.interfaceClass())
&& referenceClass.isInterface()) {
referenceConfig.setInterface((Class<T>) referenceClass);
} else if (!void.class.equals(reference.interfaceClass())) {
referenceConfig.setInterface((Class<T>) reference.interfaceClass());
}
if (beanFactory != null) {
// ... 省略,初始化 @MotanReferer的配置資訊
try {
// 校驗basicReferer配置、Protocol、Registry配置,與服務註冊過程相似
referenceConfig.afterPropertiesSet();
} catch (RuntimeException e) {
throw (RuntimeException) e;
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(),e);
}
}
referenceConfigs.putIfAbsent(key,referenceConfig);
referenceConfig = referenceConfigs.get(key);
}
// 建立Proxy
return referenceConfig.getRef();
}
複製程式碼
getRef()
方法中實際是呼叫 initRef()
方法來建立Proxy的。
public synchronized void initRef() {
// ... 校驗 interface 和 protocols 是否非空
checkInterfaceAndMethods(interfaceClass,methods);
clusterSupports = new ArrayList<>(protocols.size());
List<Cluster<T>> clusters = new ArrayList<>(protocols.size());
String proxy = null;
ConfigHandler configHandler = ExtensionLoader.getExtensionLoader(ConfigHandler.class).getExtension(MotanConstants.DEFAULT_VALUE);
// 解析註冊中心地址
List<URL> registryUrls = loadRegistryUrls();
// 解析本機IP
String localIp = getLocalHostAddress(registryUrls);
for (ProtocolConfig protocol : protocols) {
Map<String,String> params = new HashMap<>();
params.put(URLParamType.nodeType.getName(),MotanConstants.NODE_TYPE_REFERER);
params.put(URLParamType.version.getName(),URLParamType.version.getValue());
params.put(URLParamType.refreshTimestamp.getName(),String.valueOf(System.currentTimeMillis()));
collectConfigParams(params,protocol,basicReferer,extConfig,this);
collectMethodConfigParams(params,this.getMethods());
String path = StringUtils.isBlank(serviceInterface) ? interfaceClass.getName() : serviceInterface;
URL refUrl = new URL(protocol.getName(),localIp,MotanConstants.DEFAULT_INT_VALUE,path,params);
// 初始化ClusterSupport
ClusterSupport<T> clusterSupport = createClusterSupport(refUrl,configHandler,registryUrls);
clusterSupports.add(clusterSupport);
clusters.add(clusterSupport.getCluster());
if (proxy == null) {
// 獲取建立proxy的方式,預設是JDK動態代理
String defaultValue = StringUtils.isBlank(serviceInterface) ? URLParamType.proxy.getValue() : MotanConstants.PROXY_COMMON;
proxy = refUrl.getParameter(URLParamType.proxy.getName(),defaultValue);
}
}
// 建立代理
ref = configHandler.refer(interfaceClass,clusters,proxy);
initialized.set(true);
}
複製程式碼
可以發現,又呼叫了 configHandler.refer
方法,預設情況下,這個proxy引數的值是"jdk",即使用JDK自身的動態代理功能建立代理。
另外一個比較重要的類是 ClusterSupport
,這個類封裝了下面這些資訊:
private static ConcurrentHashMap<String,Protocol> protocols = new ConcurrentHashMap<String,Protocol>();
// 叢集支援
private Cluster<T> cluster;
// 註冊中心URL
private List<URL> registryUrls;
// 遠端呼叫URL
private URL url;
private Class<T> interfaceClass;
// 使用的協議
private Protocol protocol;
private ConcurrentHashMap<URL,List<Referer<T>>> registryReferers = new ConcurrentHashMap<URL,List<Referer<T>>>();
複製程式碼
其中,cluster
在motan的具體實現實際上是 ClusterSpi
這個類,他封裝了以下資訊:
public class ClusterSpi<T> implements Cluster<T> {
// 高可用策略
private HaStrategy<T> haStrategy;
// 負載均衡策略
private LoadBalance<T> loadBalance;
private List<Referer<T>> referers;
private AtomicBoolean available = new AtomicBoolean(false);
private URL url;
}
複製程式碼
這些東西在上面的 createClusterSupport
方法中生成好了,這裡先不關注Cluster、Ha、LoadBalance等,本文主要關注代理的建立及RPC呼叫。繼續看 configHandler.refer
這個方法。
public <T> T refer(Class<T> interfaceClass,List<Cluster<T>> clusters,String proxyType) {
ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getExtension(proxyType);
return proxyFactory.getProxy(interfaceClass,clusters);
}
複製程式碼
又是一個SPI擴充套件,預設使用的下面這個JDK的ProxyFactory。
@SpiMeta(name = "jdk")
public class JdkProxyFactory implements ProxyFactory {
@Override
@SuppressWarnings("unchecked")
public <T> T getProxy(Class<T> clz,List<Cluster<T>> clusters) {
return (T) Proxy.newProxyInstance(clz.getClassLoader(),new Class[]{clz},new RefererInvocationHandler<>(clz,clusters));
}
}
複製程式碼
OK,到這裡代理就建立好了。這個代理的例項最終會被 @MotanReferer 註解的field引用。初始化過程結束。
這裡要明確,最終是通過 RefererInvocationHandler
這個類建立的代理。
3 RPC呼叫
既然 RefererInvocationHandler
代理了我們的目標介面,那麼介面的每個方法呼叫都會走到這個代理類中。所以接下來主要關注代理是咋完成RPC呼叫的。
這裡只給出關鍵程式碼:
public Object invoke(Object proxy,Method method,Object[] args) throws Throwable {
// 省略 local method 部分
DefaultRequest request = new DefaultRequest();
request.setRequestId(RequestIdGenerator.getRequestId());
request.setArguments(args);
String methodName = method.getName();
boolean async = false; // 非同步呼叫支援,暫不關注
if (methodName.endsWith(MotanConstants.ASYNC_SUFFIX) && method.getReturnType().equals(ResponseFuture.class)) {
methodName = MotanFrameworkUtil.removeAsyncSuffix(methodName);
async = true;
}
request.setMethodName(methodName);
request.setParamtersDesc(ReflectUtil.getMethodParamDesc(method));
request.setInterfaceName(interfaceName);
return invokeRequest(request,getRealReturnType(async,this.clz,method,methodName),async);
}
複製程式碼
這個方法先將RPC的相關資訊封裝到 DefaultRequest
中,然後呼叫 invokeRequest
方法。
Object invokeRequest(Request request,Class returnType,boolean async) throws Throwable {
RpcContext curContext = RpcContext.getContext();
// 省略 初始化 RpcContext
// 當 referer配置多個protocol的時候,比如A,B,C,
// 那麼正常情況下只會使用A,如果A被開關降級,那麼就會使用B,B也被降級,那麼會使用C
for (Cluster<T> cluster : clusters) {
// 如果開關處於關閉狀態,不會去呼叫這個遠端機器
String protocolSwitcher = MotanConstants.PROTOCOL_SWITCHER_PREFIX + cluster.getUrl().getProtocol();
Switcher switcher = switcherService.getSwitcher(protocolSwitcher);
if (switcher != null && !switcher.isOn()) {
continue;
}
request.setAttachment(URLParamType.version.getName(),cluster.getUrl().getVersion());
request.setAttachment(URLParamType.clientGroup.getName(),cluster.getUrl().getGroup());
// 帶上client的application和module
request.setAttachment(URLParamType.application.getName(),cluster.getUrl().getApplication());
request.setAttachment(URLParamType.module.getName(),cluster.getUrl().getModule());
Response response = null;
boolean throwException = Boolean.parseBoolean(cluster.getUrl().getParameter(URLParamType.throwException.getName(),URLParamType.throwException.getValue()));
try {
MotanFrameworkUtil.logEvent(request,MotanConstants.TRACE_INVOKE);
// 執行呼叫
response = cluster.call(request);
if (async) {
// 省略非同步呼叫的支援
} else {
Object value = response.getValue();
if (value != null && value instanceof DeserializableObject) {
try {
value = ((DeserializableObject) value).deserialize(returnType);
} catch (IOException e) {
LoggerUtil.error("deserialize response value fail! deserialize type:" + returnType,e);
throw new MotanFrameworkException("deserialize return value fail! deserialize type:" + returnType,e);
}
}
return value;
}
} catch (RuntimeException e) {
// 異常處理,包括處理是否向上遊服務丟擲
}
}
throw new MotanServiceException("Referer call Error: cluster not exist,interface=" + interfaceName + " " + MotanFrameworkUtil.toString(request),MotanErrorMsgConstant.SERVICE_UNFOUND);
}
複製程式碼
cluster.call()
public Response call(Request request) {
if (available.get()) {
try {
// haStrategy是通過SPI來管理的,預設的HA策略是 failover
// 即呼叫失敗時,自動嘗試其他伺服器
return haStrategy.call(request,loadBalance);
} catch (Exception e) {
return callFalse(request,e);
}
}
return callFalse(request,new MotanServiceException(MotanErrorMsgConstant.SERVICE_UNFOUND));
}
複製程式碼
haStrategy.call()
public Response call(Request request,LoadBalance<T> loadBalance) {
// refer列表
List<Referer<T>> referers = selectReferers(request,loadBalance);
if (referers.isEmpty()) {
throw new MotanServiceException(String.format("FailoverHaStrategy No referers for request:%s,loadbalance:%s",request,loadBalance));
}
URL refUrl = referers.get(0).getUrl();
// 這裡是配置中配置的 retries 重試次數,預設:0
int tryCount =
refUrl.getMethodParameter(request.getMethodName(),request.getParamtersDesc(),URLParamType.retries.getName(),URLParamType.retries.getIntValue());
// 如果有問題,則設定為不重試
if (tryCount < 0) {
tryCount = 0;
}
for (int i = 0; i <= tryCount; i++) {
Referer<T> refer = referers.get(i % referers.size());
try {
request.setRetries(i);
return refer.call(request); // RPC
} catch (RuntimeException e) {
// 對於業務異常,直接丟擲
if (ExceptionUtil.isBizException(e)) {
throw e;
} else if (i >= tryCount) {
throw e;
}
LoggerUtil.warn(String.format("FailoverHaStrategy Call false for request:%s error=%s",e.getMessage()));
}
}
throw new MotanFrameworkException("FailoverHaStrategy.call should not come here!");
}
複製程式碼
然後,refer.call()
public Response call(Request request) {
if (!isAvailable()) {
throw new MotanFrameworkException(this.getClass().getSimpleName() + " call Error: node is not available,url=" + url.getUri()
+ " " + MotanFrameworkUtil.toString(request));
}
// 增加目標server的連線數,用於loadBalance
incrActiveCount(request);
Response response = null;
try {
response = doCall(request); // do rpc
return response;
} finally {
// 呼叫完要將目標server的連線數-1
decrActiveCount(request,response);
}
}
複製程式碼
到這裡,doCall
方法就是通過Netty呼叫RPC了。