9.原始碼分析---SOFARPC是如何實現故障剔除的?
SOFARPC原始碼解析系列:
1. 原始碼分析---SOFARPC可擴充套件的機制SPI
2. 原始碼分析---SOFARPC客戶端服務引用
3. 原始碼分析---SOFARPC客戶端服務呼叫
4. 原始碼分析---SOFARPC服務端暴露
5.原始碼分析---SOFARPC呼叫服務
6.原始碼分析---和dubbo相比SOFARPC是如何實現負載均衡的?
7.原始碼分析---SOFARPC是如何實現連線管理與心跳?
8.原始碼分析---從設計模式中看SOFARPC中的EventBus?
在第七講裡面7.原始碼分析---SOFARPC是如何實現連線管理與心跳?,我講了客戶端是怎麼維護服務端的長連線的。但是有一種情況是Consumer 和 Provider的長連線還在,註冊中心未下發摘除,但伺服器端由於某些原因,例如長時間的 Full GC, 硬體故障(後文中為避免重複,統一描述為機器假死)等場景,處於假死狀態。
這個時候 Consumer 應該不呼叫或少呼叫該 Provider,可以通過權重的方式來進行控制。目前 SOFARPC 5.3.0 以上的版本支援 RPC 單機故障剔除能力。SOFARPC 通過服務權重控制方式來減少異常服務的呼叫,將更多流量打到正常服務機器上,提高服務可用性。
接下來我們來講講具體的服務權重降級是怎麼實現的。在看這篇文章之前我希望讀者能讀完如下幾篇文章:
- 8.原始碼分析---從設計模式中看SOFARPC中的EventBus?,因為SOFARPC的服務權重降級是通過EventBus來呼叫的。
- 3. 原始碼分析---SOFARPC客戶端服務呼叫,這篇文章裡面寫了是如何呼叫服務端的,客戶端會在呼叫服務端的時候觸發匯流排,給訂閱者傳送一個訊息。
- 6.原始碼分析---和dubbo相比SOFARPC是如何實現負載均衡的?,這篇文章裡面寫的是SOFARPC的負載均衡是怎麼實現的,以及如何通過權重控制併發量。
如果你瞭解了上面的知識,那麼可以開始接下來的內容了。
例項
我們首先給出一個服務端和客戶端的例項,方便大家去除錯。
官方的文件在這裡:自動故障剔除
service
public static void main(String[] args) { ServerConfig serverConfig = new ServerConfig() .setProtocol("bolt") // 設定一個協議,預設bolt .setPort(12200) // 設定一個埠,預設12200 .setDaemon(false); // 非守護執行緒 ProviderConfig<HelloService> providerConfig = new ProviderConfig<HelloService>() .setInterfaceId(HelloService.class.getName()) // 指定介面 .setRef(new HelloServiceImpl()) // 指定實現 .setServer(serverConfig); // 指定服務端 providerConfig.export(); // 釋出服務 }
client
public static void main(String[] args) {
FaultToleranceConfig faultToleranceConfig = new FaultToleranceConfig();
faultToleranceConfig.setRegulationEffective(true);
faultToleranceConfig.setDegradeEffective(true);
faultToleranceConfig.setTimeWindow(10);
faultToleranceConfig.setWeightDegradeRate(0.5);
FaultToleranceConfigManager.putAppConfig("appName", faultToleranceConfig);
ApplicationConfig applicationConfig = new ApplicationConfig();
applicationConfig.setAppName("appName");
ConsumerConfig<HelloService> consumerConfig = new ConsumerConfig<HelloService>()
.setInterfaceId(HelloService.class.getName()) // 指定介面
.setProtocol("bolt") // 指定協議
.setDirectUrl("bolt://127.0.0.1:12200") // 指定直連地址
.setConnectTimeout(2000 * 1000)
.setApplication(applicationConfig);
HelloService helloService = consumerConfig.refer();
while (true) {
try {
LOGGER.info(helloService.sayHello("world"));
} catch (Exception e) {
e.printStackTrace();
}
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
自動故障剔除模組的註冊
我們在在客戶端的例子裡面通過FaultToleranceConfigManager註冊了FaultToleranceConfig配置。
FaultToleranceConfig faultToleranceConfig = new FaultToleranceConfig();
faultToleranceConfig.setRegulationEffective(true);
faultToleranceConfig.setDegradeEffective(true);
faultToleranceConfig.setTimeWindow(10);
faultToleranceConfig.setWeightDegradeRate(0.5);
FaultToleranceConfigManager.putAppConfig("appName", faultToleranceConfig);
我們先進入到FaultToleranceConfigManager裡面看看putAppConfig做了什麼。
FaultToleranceConfigManager#putAppConfig
/**
* All fault-tolerance config of apps
*/
private static final ConcurrentMap<String, FaultToleranceConfig> APP_CONFIGS = new ConcurrentHashMap<String, FaultToleranceConfig>();
public static void putAppConfig(String appName, FaultToleranceConfig value) {
if (appName == null) {
if (LOGGER.isWarnEnabled()) {
LOGGER.warn("App name is null when put fault-tolerance config");
}
return;
}
if (value != null) {
APP_CONFIGS.put(appName, value);
if (LOGGER.isInfoEnabled(appName)) {
LOGGER.infoWithApp(appName, "Get a new resource, value[" + value + "]");
}
} else {
APP_CONFIGS.remove(appName);
if (LOGGER.isInfoEnabled(appName)) {
LOGGER.infoWithApp(appName, "Remove a resource, key[" + appName + "]");
}
}
calcEnable();
}
static void calcEnable() {
for (FaultToleranceConfig config : APP_CONFIGS.values()) {
if (config.isRegulationEffective()) {
aftEnable = true;
return;
}
}
aftEnable = false;
}
上面的方法寫的非常的清楚:
- 校驗appName,為空的話直接返回
- 然後把我們定義的config放到APP_CONFIGS這個變數裡面
- 呼叫calcEnable,根據我們配置的config,將aftEnable變數設定為true
到這裡就完成了故障剔除的配置設定。
註冊故障剔除模組
我們在8.原始碼分析---從設計模式中看SOFARPC中的EventBus?裡面講了,初始化ConsumerConfig的時候會初始化父類的靜態程式碼塊,然後會初始化RpcRuntimeContext的靜態程式碼塊。
RpcRuntimeContext
static {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Welcome! Loading SOFA RPC Framework : {}, PID is:{}", Version.BUILD_VERSION, PID);
}
put(RpcConstants.CONFIG_KEY_RPC_VERSION, Version.RPC_VERSION);
// 初始化一些上下文
initContext();
// 初始化其它模組
ModuleFactory.installModules();
// 增加jvm關閉事件
if (RpcConfigs.getOrDefaultValue(RpcOptions.JVM_SHUTDOWN_HOOK, true)) {
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
if (LOGGER.isWarnEnabled()) {
LOGGER.warn("SOFA RPC Framework catch JVM shutdown event, Run shutdown hook now.");
}
destroy(false);
}
}, "SOFA-RPC-ShutdownHook"));
}
}
在這個程式碼塊裡面會呼叫ModuleFactory初始化其他模組
ModuleFactory#installModules
public static void installModules() {
ExtensionLoader<Module> loader = ExtensionLoaderFactory.getExtensionLoader(Module.class);
//moduleLoadList 預設是 *
String moduleLoadList = RpcConfigs.getStringValue(RpcOptions.MODULE_LOAD_LIST);
for (Map.Entry<String, ExtensionClass<Module>> o : loader.getAllExtensions().entrySet()) {
String moduleName = o.getKey();
Module module = o.getValue().getExtInstance();
// judge need load from rpc option
if (needLoad(moduleLoadList, moduleName)) {
// judge need load from implement
if (module.needLoad()) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Install Module: {}", moduleName);
}
//安裝模板
module.install();
INSTALLED_MODULES.put(moduleName, module);
} else {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("The module " + moduleName + " does not need to be loaded.");
}
}
} else {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("The module " + moduleName + " is not in the module load list.");
}
}
}
}
這裡會根據SPI初始化四個模組,分別是:
fault-tolerance
sofaTracer-resteasy
lookout
sofaTracer
我們這裡只講解fault-tolerance模組。
然後我們進入到FaultToleranceModule#install方法中
private Regulator regulator = new TimeWindowRegulator();
public void install() {
subscriber = new FaultToleranceSubscriber();
//註冊ClientSyncReceiveEvent和ClientAsyncReceiveEvent到匯流排中
EventBus.register(ClientSyncReceiveEvent.class, subscriber);
EventBus.register(ClientAsyncReceiveEvent.class, subscriber);
String regulatorAlias = RpcConfigs.getOrDefaultValue(RpcOptions.AFT_REGULATOR, "timeWindow");
regulator = ExtensionLoaderFactory.getExtensionLoader(Regulator.class).getExtension(regulatorAlias);
//呼叫TimeWindowRegulator的init方法
regulator.init();
}
這裡我們的訂閱者是FaultToleranceSubscriber例項,訂閱了兩個ClientSyncReceiveEvent和ClientAsyncReceiveEvent事件。
然後會呼叫regulator的實現類TimeWindowRegulator的初始化方法
TimeWindowRegulator#init
/**
* 度量策略(建立計算模型, 對計算模型裡的資料進行度量,選出正常和異常節點)
*/
private MeasureStrategy measureStrategy;
/**
* 計算策略(根據度量結果,判斷是否需要執行降級或者恢復)
*/
private RegulationStrategy regulationStrategy;
/**
* 降級策略: 例如調整權重
*/
private DegradeStrategy degradeStrategy;
/**
* 恢復策略:例如調整權重
*/
private RecoverStrategy recoverStrategy;
/**
* Listener for invocation stat change.
*/
private final InvocationStatListener listener = new TimeWindowRegulatorListener();
public void init() {
String measureStrategyAlias = RpcConfigs
.getOrDefaultValue(RpcOptions.AFT_MEASURE_STRATEGY, "serviceHorizontal");
String regulationStrategyAlias = RpcConfigs.getOrDefaultValue(RpcOptions.AFT_REGULATION_STRATEGY,
"serviceHorizontal");
String degradeStrategyAlias = RpcConfigs.getOrDefaultValue(RpcOptions.AFT_DEGRADE_STRATEGY, "weight");
String recoverStrategyAlias = RpcConfigs.getOrDefaultValue(RpcOptions.AFT_RECOVER_STRATEGY, "weight");
//ServiceHorizontalMeasureStrategy
measureStrategy = ExtensionLoaderFactory.getExtensionLoader(MeasureStrategy.class).getExtension(
measureStrategyAlias);
//ServiceHorizontalRegulationStrategy
regulationStrategy = ExtensionLoaderFactory.getExtensionLoader(RegulationStrategy.class).getExtension(
regulationStrategyAlias);
//WeightDegradeStrategy
degradeStrategy = ExtensionLoaderFactory.getExtensionLoader(DegradeStrategy.class).getExtension(
degradeStrategyAlias);
//WeightRecoverStrategy
recoverStrategy = ExtensionLoaderFactory.getExtensionLoader(RecoverStrategy.class).getExtension(
recoverStrategyAlias);
//TimeWindowRegulatorListener
InvocationStatFactory.addListener(listener);
}
這裡面主要是根據SPI初始化了度量策略,計算策略,降級策略,恢復策略,這些東西有什麼用,我們下面講。
觸發權重降級
我們在3. 原始碼分析---SOFARPC客戶端服務呼叫裡面講到了,客戶端在呼叫的時候最後會呼叫AbstractCluster#doSendMsg方法,然後根據不同的策略,同步、非同步、單向等呼叫然後返回response例項。
protected SofaResponse doSendMsg(ProviderInfo providerInfo, ClientTransport transport,
SofaRequest request) throws SofaRpcException {
....
// 同步呼叫
if (RpcConstants.INVOKER_TYPE_SYNC.equals(invokeType)) {
long start = RpcRuntimeContext.now();
try {
//BoltClientTransport#syncSend
response = transport.syncSend(request, timeout);
} finally {
if (RpcInternalContext.isAttachmentEnable()) {
long elapsed = RpcRuntimeContext.now() - start;
context.setAttachment(RpcConstants.INTERNAL_KEY_CLIENT_ELAPSE, elapsed);
}
}
}
....
}
因為在故障模組註冊的時候訂閱了兩個ClientSyncReceiveEvent和ClientAsyncReceiveEvent事件。即一個同步事件和一個非同步事件,我們這裡挑同步呼叫進行講解。
在上面的程式碼片段中,我們看到了會呼叫到BoltClientTransport#syncSend。
BoltClientTransport#syncSend
public SofaResponse syncSend(SofaRequest request, int timeout) throws SofaRpcException {
//檢查連線
checkConnection();
RpcInternalContext context = RpcInternalContext.getContext();
InvokeContext boltInvokeContext = createInvokeContext(request);
SofaResponse response = null;
SofaRpcException throwable = null;
try {
//向匯流排發出ClientBeforeSendEvent事件
beforeSend(context, request);
response = doInvokeSync(request, boltInvokeContext, timeout);
return response;
} catch (Exception e) { // 其它異常
throwable = convertToRpcException(e);
throw throwable;
} finally {
//向匯流排發出ClientAfterSendEvent事件
afterSend(context, boltInvokeContext, request);
//向匯流排發出ClientSyncReceiveEvent事件
if (EventBus.isEnable(ClientSyncReceiveEvent.class)) {
//把當前被呼叫的provider和ConsumerConfig傳送到匯流排中去
EventBus.post(new ClientSyncReceiveEvent(transportConfig.getConsumerConfig(),
transportConfig.getProviderInfo(), request, response, throwable));
}
}
}
其實上面這麼一大段程式碼和我們這篇文章有關係的也就只要最後向匯流排傳送ClientSyncReceiveEvent事件而已。
匯流排傳送的時候會觸發訂閱者FaultToleranceSubscriber的onEvent方法。
我們進入到FaultToleranceSubscriber#onEvent中
public void onEvent(Event originEvent) {
Class eventClass = originEvent.getClass();
if (eventClass == ClientSyncReceiveEvent.class) {
//這裡會呼叫aftEnable
if (!FaultToleranceConfigManager.isEnable()) {
return;
}
// 同步結果
ClientSyncReceiveEvent event = (ClientSyncReceiveEvent) originEvent;
ConsumerConfig consumerConfig = event.getConsumerConfig();
ProviderInfo providerInfo = event.getProviderInfo();
InvocationStat result = InvocationStatFactory.getInvocationStat(consumerConfig, providerInfo);
if (result != null) {
//記錄呼叫次數
result.invoke();
Throwable t = event.getThrowable();
if (t != null) {
//記錄異常次數
result.catchException(t);
}
}
}
...
}
這裡我們忽略其他的事件,只留下ClientSyncReceiveEvent事件的處理流程。
在這裡我們又看到了InvocationStatFactory這個工廠類,在上面TimeWindowRegulator#init也用到了這個類。
在返回result之後會呼叫invoke方法,記錄一下客戶端呼叫服務端的次數,如果有異常,也會呼叫一下catchException方法,記錄一下異常的次數。這兩個引數會在做服務剔除的時候非同步做統計使用。
InvocationStatFactory#getInvocationStat
public static InvocationStat getInvocationStat(ConsumerConfig consumerConfig, ProviderInfo providerInfo) {
String appName = consumerConfig.getAppName();
if (appName == null) {
return null;
}
// 應用開啟單機故障摘除功能
if (FaultToleranceConfigManager.isRegulationEffective(appName)) {
return getInvocationStat(new InvocationStatDimension(providerInfo, consumerConfig));
}
return null;
}
public static InvocationStat getInvocationStat(InvocationStatDimension statDimension) {
//第一次的時候為空
InvocationStat invocationStat = ALL_STATS.get(statDimension);
if (invocationStat == null) {
//直接new一個例項放入到ALL_STATS變數中
invocationStat = new ServiceExceptionInvocationStat(statDimension);
InvocationStat old = ALL_STATS.putIfAbsent(statDimension, invocationStat);
if (old != null) {
invocationStat = old;
}
//LISTENERS在呼叫TimeWindowRegulator#init的時候add進來的,只有一個TimeWindowRegulatorListener
for (InvocationStatListener listener : LISTENERS) {
listener.onAddInvocationStat(invocationStat);
}
}
return invocationStat;
}
如果是第一次來到這個方法的話,那麼會例項化一個ServiceExceptionInvocationStat放入到ALL_STATS變數中,然後遍歷InvocationStatFactory的遍歷LISTENERS,呼叫監聽器的onAddInvocationStat方法。
LISTENERS裡面的例項是我們在TimeWindowRegulator#init方法裡面add進去的TimeWindowRegulatorListener。
注意,這裡用了兩個封裝類,都是接下來要用到的。分別是InvocationStatDimension和ServiceExceptionInvocationStat。
InvocationStatDimension
public class InvocationStatDimension {
/**
* One provider of service reference
*/
private final ProviderInfo providerInfo;
/**
* Config of service reference
*/
private final ConsumerConfig consumerConfig;
/**
* cache value: dimensionKey
*/
private transient String dimensionKey;
/**
* cache value : originWeight
*/
private transient Integer originWeight;
}
ServiceExceptionInvocationStat的結構圖:
ServiceExceptionInvocationStat
public class ServiceExceptionInvocationStat extends AbstractInvocationStat {
/**
* Instantiates a new Service exception invocation stat.
*
* @param invocation the invocation
*/
public ServiceExceptionInvocationStat(InvocationStatDimension invocation) {
super(invocation);
}
@Override
public long catchException(Throwable t) {
//統計異常次數
if (t instanceof SofaRpcException) {
SofaRpcException exception = (SofaRpcException) t;
if (exception.getErrorType() == RpcErrorType.CLIENT_TIMEOUT
|| exception.getErrorType() == RpcErrorType.SERVER_BUSY) {
return exceptionCount.incrementAndGet();
}
}
return exceptionCount.get();
}
}
然後直接看它父類的具體引數就好了
AbstractInvocationStat
public abstract class AbstractInvocationStat implements InvocationStat {
/**
* 統計維度
*/
protected final InvocationStatDimension dimension;
/**
* 呼叫次數
*/
protected final AtomicLong invokeCount = new AtomicLong(0L);
/**
* 異常次數
*/
protected final AtomicLong exceptionCount = new AtomicLong(0L);
/**
* when useless in one window, this value increment 1. <br />
* If this value is greater than threshold, this stat will be deleted.
*/
private final transient AtomicInteger uselessCycle = new AtomicInteger(0);
}
上面的這些引數,我們接下來還會用到。
權重降級具體實現
TimeWindowRegulatorListener是TimeWIndowRegulator的內部類。
class TimeWindowRegulatorListener implements InvocationStatListener {
@Override
public void onAddInvocationStat(InvocationStat invocationStat) {
//度量策略不為空
if (measureStrategy != null) {
//ServiceHorizontalMeasureStrategy
MeasureModel measureModel = measureStrategy.buildMeasureModel(invocationStat);
if (measureModel != null) {
measureModels.add(measureModel);
startRegulate();
}
}
}
@Override
public void onRemoveInvocationStat(InvocationStat invocationStat) {
if (measureStrategy != null) {
measureStrategy.removeMeasureModel(invocationStat);
}
}
}
這個監聽器裡面就是呼叫ServiceHorizontalMeasureStrategy#buildMeasureModel,返回調控模型。
我們先看一下MeasureModel裡面封裝了什麼:
MeasureModel
public class MeasureModel {
/**
* App name of measure model
* 服務名
*/
private final String appName;
/**
* service name of measure model
* 被呼叫的服務
*/
private final String service;
/**
* all dimension statics stats of measure model
* InvokeStat集合
*/
private final ConcurrentHashSet<InvocationStat> stats = new ConcurrentHashSet<InvocationStat>();
....
}
所以根據這幾個全域性變數,我們可以推測,MeasureModel應該是根據appName+service為維度,裡面有很多的InvocationStat。
我們再回到ServiceHorizontalMeasureStrategy#buildMeasureModel
public MeasureModel buildMeasureModel(InvocationStat invocationStat) {
InvocationStatDimension statDimension = invocationStat.getDimension();
//AppName + ":" + Service
String key = statDimension.getDimensionKey();
MeasureModel measureModel = appServiceMeasureModels.get(key);
if (measureModel == null) {
measureModel = new MeasureModel(statDimension.getAppName(), statDimension.getService());
MeasureModel oldMeasureModel = appServiceMeasureModels.putIfAbsent(key, measureModel);
if (oldMeasureModel == null) {
measureModel.addInvocationStat(invocationStat);
return measureModel;
} else {
oldMeasureModel.addInvocationStat(invocationStat);
return null;
}
} else {
measureModel.addInvocationStat(invocationStat);
return null;
}
}
buildMeasureModel方法裡面的做法也和我上面說的一樣。根據appName+service為維度封裝不同的invocationStat在MeasureModel裡面。
接著,回到TimeWindowRegulatorListener#onAddInvocationStat中,會往下呼叫startRegulate方法。
/**
* 度量執行緒池
*/
private final ScheduledService measureScheduler = new ScheduledService("AFT-MEASURE",
ScheduledService.MODE_FIXEDRATE,
new MeasureRunnable(), 1, 1,
TimeUnit.SECONDS);
public void startRegulate() {
if (measureStarted.compareAndSet(false, true)) {
measureScheduler.start();
}
}
ScheduledService是一個執行緒池,measureScheduler變數例項化了一個固定頻率執行延遲執行緒池,會每1秒鐘固定呼叫MeasureRunnable的run方法。
MeasureRunnable是TimeWindowRegulator的內部類:
private class MeasureRunnable implements Runnable {
@Override
public void run() {
measureCounter.incrementAndGet();
//遍歷TimeWindowRegulatorListener加入的MeasureModel例項
for (MeasureModel measureModel : measureModels) {
try {
//時間視窗是10,也就是說預設每過10秒才能進入下面的方法。
if (isArriveTimeWindow(measureModel)) {
//ServiceHorizontalMeasureStrategy
MeasureResult measureResult = measureStrategy.measure(measureModel);
regulationExecutor.submit(new RegulationRunnable(measureResult));
}
} catch (Exception e) {
LOGGER.errorWithApp(measureModel.getAppName(), "Error when doMeasure: " + e.getMessage(), e);
}
}
}
private boolean isArriveTimeWindow(MeasureModel measureModel) {
//timeWindow預設是10
long timeWindow = FaultToleranceConfigManager.getTimeWindow(measureModel.getAppName());
return measureCounter.get() % timeWindow == 0;
}
}
我們先來到ServiceHorizontalMeasureStrategy#measure來看看是怎麼判斷為異常或正常
如何判斷一個節點是異常還是正常
我們首先不看程式碼的實現,先白話的說明一下是如何實現的。
- 首先在FaultToleranceSubscriber#onEvent中收到同步或非同步結果事件後,就會從工廠中獲取這次呼叫的 InvokeStat(如果 InvokeStat 已經存在則直接返回,如果沒有則建立新的並保持到快取中)。通過呼叫 InvokeStat 的 invoke 和 catchException 方法統計呼叫次數和異常次數。
- 然後在MeasureRunnable方法中根據設定的視窗期,在到達視窗期的時候會從 MeasueModel 的各個 InvokeStat 建立一份映象資料,表示當前串列埠內的呼叫情況。
- 對所有的節點進行度量,計算出所有節點的平均異常率,如果某個節點的異常率大於平均異常率到一定比例,則判定為異常。
我這裡選用官方的例子來進行說明:
假如有三個節點,提供同一服務,呼叫次數和異常數如表格所示:
invokeCount | expCount | |
---|---|---|
invokeStat 1 | 5 | 4 |
invokeStat 2 | 10 | 1 |
invokeStat 3 | 10 | 0 |
結合上述例子,度量策略的大致邏輯如下:
- 首先統計該服務下所有 ip 的平均異常率,並用 averageExceptionRate 表示。平均異常率比較好理解,即異常總數 / 總呼叫次數,上例中 averageExceptionRate =(1 + 4) / (5 + 10 + 10) = 0.2.
- 當某個ip的視窗呼叫次數小於該服務的最小視窗呼叫次數( leastWindCount )則忽略並將狀態設定為 IGNOGRE。否則進行降級和恢復度量。 如 invokeStat 1 的 invokeCount 為5,如果 leastWindCount 設定為6 則 invokeStat 1 會被忽略。
- 當某個ip的 時間視窗內的異常率和服務平均異常比例 windowExceptionRate 大於 配置的 leastWindowExceptionRateMultiplte (最小時間視窗內異常率和服務平均異常率的降級比值),那麼將該IP設定為 ABNORMAL, 否則設定為 HEALTH.
windowExceptionRate 是異常率和服務平均異常比例,invokeStat 1 的異常率為 4/5 = 0.8, 則其對應的 windowExceptionRate = 0.8 / 0.2 = 4. 假設 leastWindowExceptionRateMultiplte =4, 那麼 invokeStat 1 是一次服務,則需要進行降級操作。
接下來我們來看具體的原始碼實現:
ServiceHorizontalMeasureStrategy#measure
public MeasureResult measure(MeasureModel measureModel) {
MeasureResult measureResult = new MeasureResult();
measureResult.setMeasureModel(measureModel);
String appName = measureModel.getAppName();
List<InvocationStat> stats = measureModel.getInvocationStats();
if (!CommonUtils.isNotEmpty(stats)) {
return measureResult;
}
//1
//這個方法主要是複製出一個當前時間點的呼叫情況,只統計被複制的InvocationStat
//如果有被新剔除的InvocationStat,則不會存在於該次獲取結果中。
List<InvocationStat> invocationStats = getInvocationStatSnapshots(stats);
//FaultToleranceConfig的timeWindow所設定的,時間視窗,預設是10
long timeWindow = FaultToleranceConfigManager.getTimeWindow(appName);
/* leastWindowCount在同一次度量中保持不變*/
//預設InvocationStat如果要參與統計的視窗內最低呼叫次數,時間視窗內,至少呼叫的次數.在時間視窗內總共都不足10,認為不需要調控.
long leastWindowCount = FaultToleranceConfigManager.getLeastWindowCount(appName);
//最小是1,也就是時間視窗內,只要呼叫了就進行統計
leastWindowCount = leastWindowCount < LEGAL_LEAST_WINDOW_COUNT ? LEGAL_LEAST_WINDOW_COUNT
: leastWindowCount;
//2.
/* 計算平均異常率和度量單個ip的時候都需要使用到appWeight*/
double averageExceptionRate = calculateAverageExceptionRate(invocationStats, leastWindowCount);
//表示當前機器是平均異常率的多少倍才降級,預設是6
double leastWindowExceptionRateMultiple = FaultToleranceConfigManager
.getLeastWindowExceptionRateMultiple(appName);
for (InvocationStat invocationStat : invocationStats) {
MeasureResultDetail measureResultDetail = null;
InvocationStatDimension statDimension = invocationStat.getDimension();
long windowCount = invocationStat.getInvokeCount();
//3
//這裡主要是根據Invocation的實際權重計算該Invocation的實際最小視窗呼叫次數
long invocationLeastWindowCount = getInvocationLeastWindowCount(invocationStat,
ProviderInfoWeightManager.getWeight(statDimension.getProviderInfo()),
leastWindowCount);
//4
//當總呼叫的次數為0的時候,averageExceptionRate =-1,這個時候可以設定為忽略
if (averageExceptionRate == -1) {
measureResultDetail = new MeasureResultDetail(statDimension, MeasureState.IGNORE);
} else {
if (invocationLeastWindowCount != -1 && windowCount >= invocationLeastWindowCount) {
//獲取異常率
double windowExceptionRate = invocationStat.getExceptionRate();
//沒有異常的情況,設定狀態為健康
if (averageExceptionRate == 0) {
measureResultDetail = new MeasureResultDetail(statDimension, MeasureState.HEALTH);
} else {
//5
//這裡主要是看這次被遍歷到invocationStat的異常率和平均異常率之比
double windowExceptionRateMultiple = CalculateUtils.divide(
windowExceptionRate, averageExceptionRate);
//如果當前的invocationStat的異常是平均異常的6倍,那麼就設定狀態為異常
measureResultDetail = windowExceptionRateMultiple >= leastWindowExceptionRateMultiple ?
new MeasureResultDetail(statDimension, MeasureState.ABNORMAL) :
new MeasureResultDetail(statDimension, MeasureState.HEALTH);
}
measureResultDetail.setAbnormalRate(windowExceptionRate);
measureResultDetail.setAverageAbnormalRate(averageExceptionRate);
measureResultDetail.setLeastAbnormalRateMultiple(leastWindowExceptionRateMultiple);
} else {
measureResultDetail = new MeasureResultDetail(statDimension, MeasureState.IGNORE);
}
}
measureResultDetail.setWindowCount(windowCount);
measureResultDetail.setTimeWindow(timeWindow);
measureResultDetail.setLeastWindowCount(invocationLeastWindowCount);
measureResult.addMeasureDetail(measureResultDetail);
}
//打日誌
logMeasureResult(measureResult, timeWindow, leastWindowCount, averageExceptionRate,
leastWindowExceptionRateMultiple);
InvocationStatFactory.updateInvocationStats(invocationStats);
return measureResult;
}
上面這個方法有點長,我給這個方法標註了數字,跟著數字標記去看。
- getInvocationStatSnapshots
public static List<InvocationStat> getInvocationStatSnapshots(List<InvocationStat> stats) {
List<InvocationStat> snapshots = new ArrayList<InvocationStat>(stats.size());
for (InvocationStat stat : stats) {
//賦值一個InvocationStat出來
InvocationStat snapshot = stat.snapshot();
//如果被呼叫的次數小於0
if (snapshot.getInvokeCount() <= 0) {
if (stat.getUselessCycle().incrementAndGet() > 6) {
// 6 個時間視窗無呼叫,刪除統計
InvocationStatFactory.removeInvocationStat(stat);
InvocationStatDimension dimension = stat.getDimension();
String appName = dimension.getAppName();
if (LOGGER.isDebugEnabled(appName)) {
LOGGER.debugWithApp(appName, "Remove invocation stat : {}, {} because of useless cycle > 6",
dimension.getDimensionKey(), dimension.getProviderInfo());
}
}
} else {
//如果被呼叫了,那麼就從新計數
stat.getUselessCycle().set(0);
snapshots.add(snapshot);
}
}
return snapshots;
}
//ServiceExceptionInvocationStat#snapshot
public InvocationStat snapshot() {
ServiceExceptionInvocationStat invocationStat = new ServiceExceptionInvocationStat(dimension);
invocationStat.setInvokeCount(getInvokeCount());
invocationStat.setExceptionCount(getExceptionCount());
return invocationStat;
}
首先 這個方法裡面首先是遍歷所有的InvocationStat,然後呼叫snapshot建立一個新的InvocationStat例項。
其次 校驗新的InvocationStat例項呼叫次數是不是小於等於0,如果是,說明沒有在時間視窗內沒有被呼叫過一次,那麼就再看是不是在6 個時間視窗無呼叫,如果是,那麼就刪除統計資料
然後返回新的InvocationStat集合
- calculateAverageExceptionRate
private double calculateAverageExceptionRate(List<InvocationStat> invocationStats, long leastWindowCount) {
long sumException = 0;
long sumCall = 0;
for (InvocationStat invocationStat : invocationStats) {
long invocationLeastWindowCount = getInvocationLeastWindowCount(invocationStat,
ProviderInfoWeightManager.getWeight(invocationStat.getDimension().getProviderInfo()),
leastWindowCount);
//統計所有的invocationStat被呼叫的次數,和異常次數
if (invocationLeastWindowCount != -1
&& invocationStat.getInvokeCount() >= invocationLeastWindowCount) {
sumException += invocationStat.getExceptionCount();
sumCall += invocationStat.getInvokeCount();
}
}
if (sumCall == 0) {
return -1;
}
//計算異常比率
return CalculateUtils.divide(sumException, sumCall);
}
private long getInvocationLeastWindowCount(InvocationStat invocationStat, Integer weight, long leastWindowCount) {
//目標地址原始權重
InvocationStatDimension statDimension = invocationStat.getDimension();
Integer originWeight = statDimension.getOriginWeight();
if (originWeight == 0) {
LOGGER.errorWithApp(statDimension.getAppName(), "originWeight is 0,but is invoked. service["
+ statDimension.getService() + "];ip["
+ statDimension.getIp() + "].");
return -1;
} else if (weight == null) { //如果地址還未被調控過或者已經恢復。
return leastWindowCount;
} else if (weight == -1) { //如果地址被剔除
return -1;
}
//這裡主要是根據Invocation的實際權重計算該Invocation的實際最小視窗呼叫次數
double rate = CalculateUtils.divide(weight, originWeight);
long invocationLeastWindowCount = CalculateUtils.multiply(leastWindowCount, rate);
return invocationLeastWindowCount < LEGAL_LEAST_WINDOW_COUNT ? LEGAL_LEAST_WINDOW_COUNT
: invocationLeastWindowCount;
}
這個方法總的來說就是遍歷所有的InvocationStat,然後求和說有的呼叫次數和異常次數,然後用(異常次數/呼叫次數)計算平均異常比率。
getInvocationLeastWindowCount方法主要是用來做校驗,如果原始的權重為0,或者為-1,那麼就返回-1。
因為當前的InvocationStat的權重可能被降權過,所以我們不能按原來的最小視窗呼叫次數來算,所以這裡需要乘以一個比率,然後看是不是小於LEGAL_LEAST_WINDOW_COUNT,返回際權重計算該Invocation的實際最小視窗呼叫次數。
- if判斷
我們在分析calculateAverageExceptionRate方法的時候看了,如果總的呼叫次數為0,那麼averageExceptionRate會為-1。代表所有的InvocationStat沒有被呼叫,我們設定忽略。
那麼接著往下走,會發現有一個averageExceptionRate是否為0的判斷,由於averageExceptionRate =(異常次數/呼叫次數),所以如果沒有異常的時候設定狀態為健康。
- windowExceptionRateMultipe
windowExceptionRateMultipe這個變數主要是用來看這次被遍歷到invocationStat的異常率和平均異常率之比。如果當前的(異常率/平均異常率)>=leastWindowExceptionRateMultiple,預設是6倍,那麼就設定當前的invocationStat為異常。
根據MeasureResult進行降權或恢復
呼叫完ServiceHorizontalMeasureStrategy#measure方法後會返回一個MeasureResult,然會新建一個RegulationRunnable例項,丟到regulationExecutor執行緒池中執行。
RegulationRunnable是TimeWeindowRegulator的內部類。
RegulationRunnable#run
RegulationRunnable(MeasureResult measureResult) {
this.measureResult = measureResult;
}
public void run() {
List<MeasureResultDetail> measureResultDetails = measureResult.getAllMeasureResultDetails();
for (MeasureResultDetail measureResultDetail : measureResultDetails) {
try {
doRegulate(measureResultDetail);
} catch (Exception e) {
LOGGER.errorWithApp(measureResult.getMeasureModel().getAppName(),
"Error when doRegulate: " + e.getMessage(), e);
}
}
}
RegulationRunnable會在run方法裡面遍歷所有的measureResult,然後呼叫doRegulate方法進行降權或恢復的處理
void doRegulate(MeasureResultDetail measureResultDetail) {
MeasureState measureState = measureResultDetail.getMeasureState();
InvocationStatDimension statDimension = measureResultDetail.getInvocationStatDimension();
//預設是否進行降級 ,預設為否 ServiceHorizontalRegulationStrategy
boolean isDegradeEffective = regulationStrategy.isDegradeEffective(measureResultDetail);
if (isDegradeEffective) {
measureResultDetail.setLogOnly(false);
if (measureState.equals(MeasureState.ABNORMAL)) {
//這裡是為了以防對太多節點做了降權,所以預設限制只能最多給兩個節點降權
boolean isReachMaxDegradeIpCount = regulationStrategy.isReachMaxDegradeIpCount(measureResultDetail);
if (!isReachMaxDegradeIpCount) {
//降權 WeightDegradeStrategy
degradeStrategy.degrade(measureResultDetail);
} else {
String appName = measureResult.getMeasureModel().getAppName();
if (LOGGER.isInfoEnabled(appName)) {
LOGGER.infoWithApp(appName, LogCodes.getLog(LogCodes.INFO_REGULATION_ABNORMAL_NOT_DEGRADE,
"Reach degrade number limit.", statDimension.getService(), statDimension.getIp(),
statDimension.getAppName()));
}
}
} else if (measureState.equals(MeasureState.HEALTH)) {
boolean isExistDegradeList = regulationStrategy.isExistInTheDegradeList(measureResultDetail);
if (isExistDegradeList) {
//恢復
recoverStrategy.recover(measureResultDetail);
regulationStrategy.removeFromDegradeList(measureResultDetail);
}
//沒有被降級過,因此不需要被恢復。
}
} else {
measureResultDetail.setLogOnly(true);
if (measureState.equals(MeasureState.ABNORMAL)) {
//這個時候呼叫degrade,主要是列印日誌用的
degradeStrategy.degrade(measureResultDetail);
String appName = measureResult.getMeasureModel().getAppName();
if (LOGGER.isInfoEnabled(appName)) {
LOGGER.infoWithApp(appName, LogCodes.getLog(LogCodes.INFO_REGULATION_ABNORMAL_NOT_DEGRADE,
"Degrade switch is off", statDimension.getService(),
statDimension.getIp(), statDimension.getAppName()));
}
}
}
}
}
我們分兩種情況進行分析。
- 如果該節點是異常節點
首先會呼叫ServiceHorizontalRegulationStrategy#isReachMaxDegradeIpCount方法。
ServiceHorizontalRegulationStrategy#isReachMaxDegradeIpCount
public boolean isReachMaxDegradeIpCount(MeasureResultDetail measureResultDetail) {
InvocationStatDimension statDimension = measureResultDetail.getInvocationStatDimension();
ConcurrentHashSet<String> ips = getDegradeProviders(statDimension.getDimensionKey());
String ip = statDimension.getIp();
if (ips.contains(ip)) {
return false;
} else {
//預設一個服務能夠調控的最大ip數
int degradeMaxIpCount = FaultToleranceConfigManager.getDegradeMaxIpCount(statDimension.getAppName());
ipsLock.lock();
try {
if (ips.size() < degradeMaxIpCount) {
ips.add(ip);
return false;
} else {
return true;
}
} finally {
ipsLock.unlock();
}
}
}
這個方法是為了能夠控制最多一個服務下面能調控多少個節點。比如一個服務下面只有3個節點,其中2個節點出了問題,通過調控解決了,那麼不可能將第三個節點也進行調控了吧,必須要進行人工干預了,為啥會出現這樣的問題。
然後會呼叫WeightDegradeStrategy#degrade對節點進行降權
WeightDegradeStrategy#degrade
public void degrade(MeasureResultDetail measureResultDetail) {
//呼叫LogPrintDegradeStrategy方法,列印日誌用
super.degrade(measureResultDetail);
if (measureResultDetail.isLogOnly()) {
return;
}
InvocationStatDimension statDimension = measureResultDetail.getInvocationStatDimension();
String appName = statDimension.getAppName();
ProviderInfo providerInfo = statDimension.getProviderInfo();
// if provider is removed or provider is warming up
//如果為空,或是在預熱中,則直接返回
if (providerInfo == null || providerInfo.getStatus() == ProviderStatus.WARMING_UP) {
return;
}
//目前provider權重
int currentWeight = ProviderInfoWeightManager.getWeight(providerInfo);
//降權比重
double weightDegradeRate = FaultToleranceConfigManager.getWeightDegradeRate(appName);
//最少權重,預設為1
int degradeLeastWeight = FaultToleranceConfigManager.getDegradeLeastWeight(appName);
//權重比率 * 目前權重
int degradeWeight = CalculateUtils.multiply(currentWeight, weightDegradeRate);
//不能小於最小值
degradeWeight = degradeWeight < degradeLeastWeight ? degradeLeastWeight : degradeWeight;
// degrade weight of this provider info
boolean success = ProviderInfoWeightManager.degradeWeight(providerInfo, degradeWeight);
if (success && LOGGER.isInfoEnabled(appName)) {
LOGGER.infoWithApp(appName, "the weight was degraded. serviceUniqueName:["
+ statDimension.getService() + "],ip:["
+ statDimension.getIp() + "],origin weight:["
+ currentWeight + "],degraded weight:["
+ degradeWeight + "].");
}
}
//ProviderInfoWeightManager
public static boolean degradeWeight(ProviderInfo providerInfo, int weight) {
providerInfo.setStatus(ProviderStatus.DEGRADED);
providerInfo.setWeight(weight);
return true;
}
這個方法實際上就是權重拿出來,然後根據比率進行設值並且不能小於最小的比重。
最後呼叫ProviderInfoWeightManager把當前的節點設值為DEGRADED,並設值新的權重。
- 如果是健康節點
呼叫ServiceHorizontalRegulationStrategy#isExistInTheDegradeList判斷一下當前節點有沒有被降級
ServiceHorizontalRegulationStrategy#isExistInTheDegradeList
public boolean isExistInTheDegradeList(MeasureResultDetail measureResultDetail) {
InvocationStatDimension statDimension = measureResultDetail.getInvocationStatDimension();
ConcurrentHashSet<String> ips = getDegradeProviders(statDimension.getDimensionKey());
return ips != null && ips.contains(statDimension.getIp());
}
在呼叫isReachMaxDegradeIpCount方法的時候會把被降級的ip放入到ips集合中,所以這裡只要獲取就可以了。
如果該節點已被降級那麼呼叫WeightRecoverStrategy#recover進行恢復
WeightRecoverStrategy#recover
public void recover(MeasureResultDetail measureResultDetail) {
InvocationStatDimension statDimension = measureResultDetail.getInvocationStatDimension();
ProviderInfo providerInfo = statDimension.getProviderInfo();
// if provider is removed or provider is warming up
if (providerInfo == null || providerInfo.getStatus() == ProviderStatus.WARMING_UP) {
return;
}
Integer currentWeight = ProviderInfoWeightManager.getWeight(providerInfo);
if (currentWeight == -1) {
return;
}
String appName = statDimension.getAppName();
//預設2
double weightRecoverRate = FaultToleranceConfigManager.getWeightRecoverRate(appName);
//也就是說一次只能恢復到2倍,不會一次性就恢復到originWeight
int recoverWeight = CalculateUtils.multiply(currentWeight, weightRecoverRate);
int originWeight = statDimension.getOriginWeight();
// recover weight of this provider info
if (recoverWeight >= originWeight) {
measureResultDetail.setRecoveredOriginWeight(true);
//將provider狀態設定為AVAILABLE,並且設定Weight
ProviderInfoWeightManager.recoverOriginWeight(providerInfo, originWeight);
if (LOGGER.isInfoEnabled(appName)) {
LOGGER.infoWithApp(appName, "the weight was recovered to origin value. serviceUniqueName:["
+ statDimension.getService() + "],ip:["
+ statDimension.getIp() + "],origin weight:["
+ currentWeight + "],recover weight:["
+ originWeight + "].");
}
} else {
measureResultDetail.setRecoveredOriginWeight(false);
boolean success = ProviderInfoWeightManager.recoverWeight(providerInfo, recoverWeight);
if (success && LOGGER.isInfoEnabled(appName)) {
LOGGER.infoWithApp(appName, "the weight was recovered. serviceUniqueName:["
+ statDimension.getService() + "],ip:["
+ statDimension.getIp() + "],origin weight:["
+ currentWeight + "],recover weight:["
+ recoverWeight + "].");
}
}
}
這個方法很簡單,各位可以看看我上面的註釋。
總結
總的來說FaultToleranceModule分為兩部分:
- FaultToleranceSubscriber訂閱事件,負責訂閱同步和非同步結果事件
- 根據呼叫事件進行統計,以及內建的一些策略完成服務的降級和恢復操作。