ribbon原始碼之客戶端
阿新 • • 發佈:2018-11-15
客戶端模組的核心功能是提供統一的使用者請求操作介面。
介面定義
客戶端模組的核心是IClient介面,定義了客戶端網路請求的方法。
public interface IClient<S extends ClientRequest, T extends IResponse> { public T execute(S request, IClientConfig requestConfig) throws Exception; }
ClientRequest為客戶端定義的請求體,儲存了請求uri、loadbalancer的key,是否重試、配置。
public class ClientRequest implements Cloneable { protected URI uri; protected Object loadBalancerKey = null; protected Boolean isRetriable = null; protected IClientConfig overrideConfig; }
IResponse為客戶端定義的響應內容的介面。
public interface IResponse extends Closeable { publicObject getPayload() throws ClientException; public boolean hasPayload(); public boolean isSuccess(); public URI getRequestedURI(); public Map<String, ?> getHeaders(); }
IClientConfigAware定義了需要使用IClientConfig初始化IClient的方法。
public interface IClientConfigAware { publicabstract void initWithNiwsConfig(IClientConfig clientConfig); }
類圖
客戶端工廠類
客戶端模組提供了一個客戶端工廠類(ClientFactory)用於通過配置檔案來建立IClient例項和負載均衡器(ILoadBalancer)例項。
public static synchronized IClient getNamedClient(String name) {//根據配置獲取iclient例項,預設使用DefaultClientConfigImpl配置類。 return getNamedClient(name, DefaultClientConfigImpl.class); }
public static synchronized IClient getNamedClient(String name, Class<? extends IClientConfig> configClass) {
...
return createNamedClient(name, configClass);
...
}
public static synchronized IClient createNamedClient(String name, Class<? extends IClientConfig> configClass) throws ClientException {
IClientConfig config = getNamedConfig(name, configClass);//例項化配置類
return registerClientFromProperties(name, config);//通過配置檔案建立iclient
}
public static synchronized ILoadBalancer getNamedLoadBalancer(String name) {
return getNamedLoadBalancer(name, DefaultClientConfigImpl.class);
}
public static synchronized ILoadBalancer getNamedLoadBalancer(String name, Class<? extends IClientConfig> configClass) {
...
lb = registerNamedLoadBalancerFromProperties(name, configClass);
...
}
public static synchronized IClient<?, ?> registerClientFromProperties(String restClientName, IClientConfig clientConfig) throws ClientException { IClient<?, ?> client = null; ILoadBalancer loadBalancer = null; ... String clientClassName = (String) clientConfig.getProperty(CommonClientConfigKey.ClientClassName);//通過配置檔案獲取client的實現類 client = (IClient<?, ?>) instantiateInstanceWithClientConfig(clientClassName, clientConfig); //通過配置檔案建立client例項 boolean initializeNFLoadBalancer = Boolean.parseBoolean(clientConfig.getProperty( CommonClientConfigKey.InitializeNFLoadBalancer, DefaultClientConfigImpl.DEFAULT_ENABLE_LOADBALANCER).toString()); if (initializeNFLoadBalancer) {//如果需要初始化負載均衡器,則通過配置檔案建立一個負載均衡器 loadBalancer = registerNamedLoadBalancerFromclientConfig(restClientName, clientConfig); } if (client instanceof AbstractLoadBalancerAwareClient) {//如果client實現AbstractLoadBalancerAwareClient,則注入負載均衡器 ((AbstractLoadBalancerAwareClient) client).setLoadBalancer(loadBalancer); } ...return client; }
public static ILoadBalancer registerNamedLoadBalancerFromclientConfig(String name, IClientConfig clientConfig) throws ClientException {
...
ILoadBalancer lb = null;
...
String loadBalancerClassName = (String) clientConfig.getProperty(CommonClientConfigKey.NFLoadBalancerClassName);//
lb = (ILoadBalancer) ClientFactory.instantiateInstanceWithClientConfig(loadBalancerClassName, clientConfig);
...
return lb;
...
}
//初始化指定的class類
public static Object instantiateInstanceWithClientConfig(String className, IClientConfig clientConfig)
throws InstantiationException, IllegalAccessException, ClassNotFoundException {
Class clazz = Class.forName(className);
if (IClientConfigAware.class.isAssignableFrom(clazz)) {//如果指定的iclient實現了IClientConfigAware,ClientFactory在建立時會使用IClientConfig進行初始化。
IClientConfigAware obj = (IClientConfigAware) clazz.newInstance();
obj.initWithNiwsConfig(clientConfig);
return obj;
} else {
try {
if (clazz.getConstructor(IClientConfig.class) != null) {
return clazz.getConstructor(IClientConfig.class).newInstance(clientConfig);
}
} catch (Throwable e) { // NOPMD
}
}
return clazz.newInstance();
}
使用客戶端工廠類(ClientFactory)涉及的配置:
屬性 | 實現 | 預設值 |
clientname.ribbon.ClientClassName | client使用的IClient實現類 | com.netflix.niws.client.http.RestClient |
clientname.ribbon.InitializeNFLoadBalancer | 是否初始化負載均衡器 | true |
clientname.ribbon.NFLoadBalancerClassName | 負載均衡器的實現類 | com.netflix.loadbalancer.ZoneAwareLoadBalancer |
類圖
客戶端實現類
AbstractLoadBalancerAwareClient實現了通過負載均衡器進行請求呼叫。LoadBalancerCommand對負載均衡器操作進行了模版,對請求呼叫提供了回撥函式。
public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException { LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig); ...return command.submit( new ServerOperation<T>() { @Override public Observable<T> call(Server server) { URI finalUri = reconstructURIWithServer(server, request.getUri()); S requestForServer = (S) request.replaceUri(finalUri);//設定最終的呼叫uri。 try { return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig)); } //呼叫execute方法執行請求呼叫。 catch (Exception e) { return Observable.error(e); } } }) .toBlocking() .single(); ... }
LoadBalancerCommand呼叫了負載均衡器獲得了一個server,然後呼叫回撥函式執行請求。此外還提供了各個關鍵節點的監聽器和異常重試機制。
public Observable<T> submit(final ServerOperation<T> operation) { final ExecutionInfoContext context = new ExecutionInfoContext(); if (listenerInvoker != null) {//執行回撥介面 try { listenerInvoker.onExecutionStart(); } catch (AbortExecutionException e) { return Observable.error(e); } } final int maxRetrysSame = retryHandler.getMaxRetriesOnSameServer(); final int maxRetrysNext = retryHandler.getMaxRetriesOnNextServer(); Observable<T> o = (server == null ? selectServer() : Observable.just(server))//呼叫負載均衡器獲得目標server .concatMap(new Func1<Server, Observable<T>>() { ...return operation.call(server).doOnEach(new Observer<T>() { .... }); ... if (maxRetrysSame > 0) o = o.retry(retryPolicy(maxRetrysSame, true)); return o; } }); if (maxRetrysNext > 0 && server == null) o = o.retry(retryPolicy(maxRetrysNext, false)); return o.onErrorResumeNext(new Func1<Throwable, Observable<T>>() { @Override public Observable<T> call(Throwable e) { if (context.getAttemptCount() > 0) { if (maxRetrysNext > 0 && context.getServerAttemptCount() == (maxRetrysNext + 1)) { e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_NEXTSERVER_EXCEEDED, "Number of retries on next server exceeded max " + maxRetrysNext + " retries, while making a call for: " + context.getServer(), e); } else if (maxRetrysSame > 0 && context.getAttemptCount() == (maxRetrysSame + 1)) { e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_EXEEDED, "Number of retries exceeded max " + maxRetrysSame + " retries, while making a call for: " + context.getServer(), e); } } if (listenerInvoker != null) { listenerInvoker.onExecutionFailed(e, context.toFinalExecutionInfo()); } return Observable.error(e); } }); }
private Observable<Server> selectServer() { return Observable.create(new OnSubscribe<Server>() { @Override public void call(Subscriber<? super Server> next) { try { Server server = loadBalancerContext.getServerFromLoadBalancer(loadBalancerURI, loadBalancerKey); next.onNext(server); next.onCompleted(); } catch (Exception e) { next.onError(e); } } }); }
子類 HttpRequest用於http請求,內部定義了http請求的各個內容,並且使用了builder模式。
public class HttpRequest extends ClientRequest {protected CaseInsensitiveMultiMap httpHeaders = new CaseInsensitiveMultiMap();//head引數
protected Multimap<String, String> queryParams = ArrayListMultimap.create();//query引數
private Object entity;//訊息體
protected Verb verb;//http請求的method:post get head delete等。預設get }
類圖