Spring Cloud Ribbon 全解 (3)
上一篇我們瞭解到Ribbon主要由如下幾個元件組成:
- 所有Ribbon負載均衡器需要實現的介面IClient
- 服務例項列表維護機制實現的介面ServerList
- 負載均衡資料記錄LoadBalancerStats
- 負責選取Server的介面ILoadBalancer
- 負載均衡選取規則實現的介面IRule
- 檢查例項是否存活實現的介面IPing
- 服務例項列表更新機制實現的介面ServerListUpdater
- 服務例項列表過濾機制ServerListFilter
我們會逐個分析
1. 所有Ribbon負載均衡器需要實現的介面IClient
對於這個IClient,之前我們說到執行器邏輯,例如重試還有異常處理,都在這裡處理。我們看他的預設抽象類實現AbstractLoadBalancerAwareClient:
AbstractLoadBalancerAwareClient.java
public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
//獲取重試處理器,這個由其他實現類動態實現
RequestSpecificRetryHandler handler = getRequestSpecificRetryHandler(request, requestConfig);
//構造LoadBalancerCommand,RxJava風格
LoadBalancerCommand<T> command = LoadBalancerCommand.<T>builder()
.withLoadBalancerContext(this)
.withRetryHandler(handler)
.withLoadBalancerURI(request.getUri())
.build();
try {
return command.submit(
new ServerOperation<T>() {
@Override
public Observable<T> call(Server server) {
//修改原始url為實際的url
URI finalUri = reconstructURIWithServer(server, request.getUri());
S requestForServer = (S) request.replaceUri(finalUri);
try {
//執行請求
return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
}
catch (Exception e) {
return Observable.error(e);
}
}
})
.toBlocking()
.single();
} catch (Exception e) {
Throwable t = e.getCause();
if (t instanceof ClientException) {
throw (ClientException) t;
} else {
throw new ClientException(e);
}
}
}
public abstract RequestSpecificRetryHandler getRequestSpecificRetryHandler(S request, IClientConfig requestConfig);
這個構造的LoadBalancerCommand是一個RxJava風格的,它包含了重試和異常處理機制:
LoadBalancerCommand.java
//返回一個只包含一個Server的Observable,但是每次從負載均衡器中獲取一個
private Observable<Server> selectServer() {
return Observable.create(new OnSubscribe<Server>() {
@Override
public void call(Subscriber<? super Server> next) {
try {
Server server = loadBalancerContext.getServerFromLoadBalancer(loadBalancerURI, loadBalancerKey);
next.onNext(server);
next.onCompleted();
} catch (Exception e) {
next.onError(e);
}
}
});
}
public Observable<T> submit(final ServerOperation<T> operation) {
final ExecutionInfoContext context = new ExecutionInfoContext();
if (listenerInvoker != null) {
try {
listenerInvoker.onExecutionStart();
} catch (AbortExecutionException e) {
return Observable.error(e);
}
}
//獲取在每個服務例項重試的的次數
final int maxRetrysSame = retryHandler.getMaxRetriesOnSameServer();
//最多嘗試幾個服務例項
final int maxRetrysNext = retryHandler.getMaxRetriesOnNextServer();
//對於每個服務例項的呼叫邏輯
//預設field server是null,通過selectServer()方法獲取一個Server
Observable<T> o =
(server == null ? selectServer() : Observable.just(server))
.concatMap(new Func1<Server, Observable<T>>() {
@Override
//對於每個Server,按順序對映為對於每個Server包含重試邏輯的請求呼叫
public Observable<T> call(Server server) {
//設定上下文
context.setServer(server);
final ServerStats stats = loadBalancerContext.getServerStats(server);
//每個Server包含重試邏輯的請求呼叫
Observable<T> o = Observable
.just(server)
.concatMap(new Func1<Server, Observable<T>>() {
@Override
public Observable<T> call(final Server server) {
context.incAttemptCount();
//增加Server正在處理的請求計數
loadBalancerContext.noteOpenConnection(stats);
//監聽器
if (listenerInvoker != null) {
try {
listenerInvoker.onStartWithServer(context.toExecutionInfo());
} catch (AbortExecutionException e) {
return Observable.error(e);
}
}
//計時器
final Stopwatch tracer = loadBalancerContext.getExecuteTracer().start();
//operation.call(server)就是剛剛分析的AbstractLoadBalancerAwareClient傳過來的ServerOperation,就是直接對這個Server呼叫請求
//doOnEach的操作就是記錄請求前後的一些資料用於負載均衡資料統計
return operation.call(server).doOnEach(new Observer<T>() {
private T entity;
@Override
public void onCompleted() {
//記錄請求完成
recordStats(tracer, stats, entity, null);
}
@Override
public void onError(Throwable e) {
//記錄請求結束
recordStats(tracer, stats, null, e);
logger.debug("Got error {} when executed on server {}", e, server);
//發生了錯誤,通知listener
if (listenerInvoker != null) {
listenerInvoker.onExceptionWithServer(e, context.toExecutionInfo());
}
}
@Override
public void onNext(T entity) {
//因為只有呼叫請求成功只有一個結果(只有一個請求), 這裡的entity就是結果,只要收到結果就代表請求成功
this.entity = entity;
if (listenerInvoker != null) {
listenerInvoker.onExecutionSuccess(entity, context.toExecutionInfo());
}
}
private void recordStats(Stopwatch tracer, ServerStats stats, Object entity, Throwable exception) {
tracer.stop();
loadBalancerContext.noteRequestCompletion(stats, entity, exception, tracer.getDuration(TimeUnit.MILLISECONDS), retryHandler);
}
});
}
});
if (maxRetrysSame > 0)
//是否retry
o = o.retry(retryPolicy(maxRetrysSame, true));
return o;
}
});
if (maxRetrysNext > 0 && server == null)
//是否retry,如果retry回撥用selectServer()返回下一個Server
o = o.retry(retryPolicy(maxRetrysNext, false));
//異常處理
return o.onErrorResumeNext(new Func1<Throwable, Observable<T>>() {
@Override
public Observable<T> call(Throwable e) {
if (context.getAttemptCount() > 0) {
//如果超過重試次數,則拋異常
if (maxRetrysNext > 0 && context.getServerAttemptCount() == (maxRetrysNext + 1)) {
e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_NEXTSERVER_EXCEEDED,
"Number of retries on next server exceeded max " + maxRetrysNext
+ " retries, while making a call for: " + context.getServer(), e);
}
else if (maxRetrysSame > 0 && context.getAttemptCount() == (maxRetrysSame + 1)) {
e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_EXEEDED,
"Number of retries exceeded max " + maxRetrysSame
+ " retries, while making a call for: " + context.getServer(), e);
}
}
if (listenerInvoker != null) {
listenerInvoker.onExecutionFailed(e, context.toFinalExecutionInfo());
}
return Observable.error(e);
}
});
}
2. 服務例項列表維護機制實現的介面ServerList
AbstractServerList.java
其實這個抽象類一是在實現ServerList介面的同時,實現了IClientConfigAware這個介面,代表是可配置的。
同時,提供了一個生成預設ServerListFilter(這個Filter的實現類是由NIWSServerListFilterClassName這個配置決定,預設是ZoneAffinityServerListFilter)的方法
public abstract class AbstractServerList<T extends Server> implements ServerList<T>, IClientConfigAware {
public AbstractServerListFilter<T> getFilterImpl(IClientConfig niwsClientConfig) throws ClientException{
try {
String niwsServerListFilterClassName = niwsClientConfig
.getProperty(
CommonClientConfigKey.NIWSServerListFilterClassName,
ZoneAffinityServerListFilter.class.getName())
.toString();
AbstractServerListFilter<T> abstractNIWSServerListFilter =
(AbstractServerListFilter<T>) ClientFactory.instantiateInstanceWithClientConfig(niwsServerListFilterClassName, niwsClientConfig);
return abstractNIWSServerListFilter;
} catch (Throwable e) {
throw new ClientException(
ClientException.ErrorType.CONFIGURATION,
"Unable to get an instance of CommonClientConfigKey.NIWSServerListFilterClassName. Configured class:"
+ niwsClientConfig
.getProperty(CommonClientConfigKey.NIWSServerListFilterClassName), e);
}
}
}
ConfigurationBasedServerList.java
這個是預設的實現,如果沒有特殊配置,ServerList的實現類就是ConfigurationBasedServerList;這個實際上就是從配置中讀取ServerList,這個配置可以是動態配置,例如是Archaius
public class ConfigurationBasedServerList extends AbstractServerList<Server> {
private IClientConfig clientConfig;
@Override
public List<Server> getInitialListOfServers() {
return getUpdatedListOfServers();
}
@Override
public List<Server> getUpdatedListOfServers() {
String listOfServers = clientConfig.get(CommonClientConfigKey.ListOfServers);
return derive(listOfServers);
}
@Override
public void initWithNiwsConfig(IClientConfig clientConfig) {
this.clientConfig = clientConfig;
}
//可以看出這個配置就是以逗號分隔的字串
private List<Server> derive(String value) {
List<Server> list = Lists.newArrayList();
if (!Strings.isNullOrEmpty(value)) {
for (String s: value.split(",")) {
list.add(new Server(s.trim()));
}
}
return list;
}
}
DiscoveryEnabledNIWSServerList.java
這個就是從Eureka上面獲取Server列表的類,構造的時候需要傳入相關配置以及最重要的EurekaClient的Provider來獲取合適的EurekaClient以便於獲取Server列表。
實現ServerList介面的方法都是基於obtainServersViaDiscovery這個方法:
@Override
public List<DiscoveryEnabledServer> getInitialListOfServers(){
return obtainServersViaDiscovery();
}
@Override
public List<DiscoveryEnabledServer> getUpdatedListOfServers(){
return obtainServersViaDiscovery();
}
private List<DiscoveryEnabledServer> obtainServersViaDiscovery() {
List<DiscoveryEnabledServer> serverList = new ArrayList<DiscoveryEnabledServer>();
//如果EurekaClient沒有被初始化,則日誌報警並返回空的列表
if (eurekaClientProvider == null || eurekaClientProvider.get() == null) {
logger.warn("EurekaClient has not been initialized yet, returning an empty list");
return new ArrayList<DiscoveryEnabledServer>();
}
EurekaClient eurekaClient = eurekaClientProvider.get();
//這裡的vipAddresses其實就是微服務名稱的各種形式,但是注意,它們代表的是同一個微服務
if (vipAddresses!=null){
for (String vipAddress : vipAddresses.split(",")) {
// if targetRegion is null, it will be interpreted as the same region of client
List<InstanceInfo> listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, isSecure, targetRegion);
for (InstanceInfo ii : listOfInstanceInfo) {
if (ii.getStatus().equals(InstanceStatus.UP)) {
//是否覆蓋port
if(shouldUseOverridePort){
if(logger.isDebugEnabled()){
logger.debug("Overriding port on client name: " + clientName + " to " + overridePort);
}
//這裡複製一份是因為不希望其他的地方修改原有的例項資訊
InstanceInfo copy = new InstanceInfo(ii);
if(isSecure){
ii = new InstanceInfo.Builder(copy).setSecurePort(overridePort).build();
}else{
ii = new InstanceInfo.Builder(copy).setPort(overridePort).build();
}
}
DiscoveryEnabledServer des = new DiscoveryEnabledServer(ii, isSecure, shouldUseIpAddr);
des.setZone(DiscoveryClient.getZone(ii));
serverList.add(des);
}
}
//如果有一個vipAddress有服務列表,我們就不用獲取剩餘的了
if (serverList.size()>0 && prioritizeVipAddressBasedServers){
break;
}
}
}
return serverList;
}
到這裡我們可以看出,Ribbon和Eureka的配合其實就是Ribbon從Eureka中利用微服務名稱獲取Server列表;那麼這個列表是如何更新的呢,在Eureka的章節我們提到過,Ribbon定時從EurekaClient獲取服務例項列表更新,這就涉及到了下一個我們要講到的Ribbon元素 - 服務例項列表更新機制實現的介面ServerListUpdater