SpringCloud升級之路2020.0.x版-32. 改進負載均衡演算法
在前面一節,我們梳理了實現 Feign 斷路器以及執行緒隔離的思路,這一節,我們先不看如何原始碼實現(因為原始碼中會包含負載均衡演算法的改進部分),先來討論下如何優化目前的負載均衡演算法。
之前的負載均衡演算法
- 獲取服務例項列表,將例項列表按照 ip 埠排序,如果不排序即使 position 是下一個可能也代表的是之前已經呼叫過的例項
- 根據請求中的 traceId,從本地快取中以 traceId 為 key 獲取一個初始值為隨機數的原子變數 position,這樣防止所有請求都從第一個例項開始呼叫,之後第二個、第三個這樣。
- position 原子加一,之後對例項個數取餘,返回對應下標的例項進行呼叫
其中請求包含 traceId 是來自於我們使用了 spring-cloud-sleuth 鏈路追蹤,基於這種機制我們能保證請求不會重試到之前已經呼叫過的例項。原始碼是:
//一定必須是實現ReactorServiceInstanceLoadBalancer //而不是ReactorLoadBalancer<ServiceInstance> //因為註冊的時候是ReactorServiceInstanceLoadBalancer @Log4j2 public class RoundRobinWithRequestSeparatedPositionLoadBalancer implements ReactorServiceInstanceLoadBalancer { private final ServiceInstanceListSupplier serviceInstanceListSupplier; //每次請求算上重試不會超過1分鐘 //對於超過1分鐘的,這種請求肯定比較重,不應該重試 private final LoadingCache<Long, AtomicInteger> positionCache = Caffeine.newBuilder().expireAfterWrite(1, TimeUnit.MINUTES) //隨機初始值,防止每次都是從第一個開始呼叫 .build(k -> new AtomicInteger(ThreadLocalRandom.current().nextInt(0, 1000))); private final String serviceId; private final Tracer tracer; public RoundRobinWithRequestSeparatedPositionLoadBalancer(ServiceInstanceListSupplier serviceInstanceListSupplier, String serviceId, Tracer tracer) { this.serviceInstanceListSupplier = serviceInstanceListSupplier; this.serviceId = serviceId; this.tracer = tracer; } //每次重試,其實都會呼叫這個 choose 方法重新獲取一個例項 @Override public Mono<Response<ServiceInstance>> choose(Request request) { return serviceInstanceListSupplier.get().next().map(serviceInstances -> getInstanceResponse(serviceInstances)); } private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> serviceInstances) { if (serviceInstances.isEmpty()) { log.warn("No servers available for service: " + this.serviceId); return new EmptyResponse(); } return getInstanceResponseByRoundRobin(serviceInstances); } private Response<ServiceInstance> getInstanceResponseByRoundRobin(List<ServiceInstance> serviceInstances) { if (serviceInstances.isEmpty()) { log.warn("No servers available for service: " + this.serviceId); return new EmptyResponse(); } //為了解決原始演算法不同調用併發可能導致一個請求重試相同的例項 //從 sleuth 的 Tracer 中獲取當前請求的上下文 Span currentSpan = tracer.currentSpan(); //如果上下文不存在,則可能不是前端使用者請求,而是其他某些機制觸發,我們就建立一個新的上下文 if (currentSpan == null) { currentSpan = tracer.newTrace(); } //從請求上下文中獲取請求的 traceId,用來唯一標識一個請求 long l = currentSpan.context().traceId(); AtomicInteger seed = positionCache.get(l); int s = seed.getAndIncrement(); int pos = s % serviceInstances.size(); log.info("position {}, seed: {}, instances count: {}", pos, s, serviceInstances.size()); return new DefaultResponse(serviceInstances.stream() //例項返回列表順序可能不同,為了保持一致,先排序再取 .sorted(Comparator.comparing(ServiceInstance::getInstanceId)) .collect(Collectors.toList()).get(pos)); } }
但是在這次請求突增很多的時候,這種負載均衡演算法還是給我們帶來了問題。
首先,本次突增,我們並沒有採取擴容,導致本次的效能壓力對於壓力的均衡分佈非常敏感。舉個例子是,假設微服務 A 有 9 個例項,在業務高峰點來的時候,最理想的情況是保證無論何時這 9 個負載壓力都完全均衡,但是由於我們使用了初始值為隨機數的原子變數 position,雖然從一天的總量上來看,負責均衡壓力肯定是均衡,但是在某一小段時間內,很可能壓力全都跑到了某幾個例項上,導致這幾個例項被壓垮,熔斷,然後又都跑到了另外的幾個例項上,又被壓垮,熔斷,如此惡性迴圈。
然後,我們部署採用的是 k8s 部署,同一個虛擬機器上面可能會跑很多微服務的 pod。在某些情況下,同一個微服務的多個 pod 可能會跑到同一個虛擬機器 Node 上,這個可以從pod 的 ip 網段上看出來
最後,如果呼叫某個例項一直失敗,那麼這個例項的呼叫優先順序需要排在其他正常的例項後面。這個對於減少快速刷新發布(一下子啟動很多例項之後停掉多個老例項,例項個數大於重試次數配置)對於使用者的影響,以及某個可用區突然發生異常導致多個例項下線對使用者的影響,以及業務壓力已經過去,壓力變小後,需要關掉不再需要的例項,導致大量例項發生遷移的時候對使用者的影響,有很大的作用。
針對以上問題的優化方案
我們針對上面三個問題,提出了一種優化後的解決方案:
- 針對每次請求,記錄:
- 本次請求已經呼叫過哪些例項 -> 請求呼叫過的例項快取
- 呼叫的例項,當前有多少請求在處理中 -> 例項執行請求數
- 呼叫的例項,最近請求錯誤率 -> 例項請求錯誤率
- 隨機將例項列表打亂,防止在以上三個指標都相同時,總是將請求發給同一個例項。
- 按照 當前請求沒有呼叫過靠前 -> 錯誤率越小越靠前 的順序排序 -> 例項執行請求數越小越靠前
- 取排好序之後的列表第一個例項作為本次負載均衡的例項
具體實現是:以下的程式碼來自於:https://github.com/JoJoTec/spring-cloud-parent
我們使用了依賴:
<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
</dependency>
記錄例項資料的快取類:
@Log4j2
public class ServiceInstanceMetrics {
private static final String CALLING = "-Calling";
private static final String FAILED = "-Failed";
private MetricRegistry metricRegistry;
ServiceInstanceMetrics() {
}
public ServiceInstanceMetrics(MetricRegistry metricRegistry) {
this.metricRegistry = metricRegistry;
}
/**
* 記錄呼叫例項
* @param serviceInstance
*/
public void recordServiceInstanceCall(ServiceInstance serviceInstance) {
String key = serviceInstance.getHost() + ":" + serviceInstance.getPort();
metricRegistry.counter(key + CALLING).inc();
}
/**
* 記錄呼叫例項結束
* @param serviceInstance
* @param isSuccess 是否成功
*/
public void recordServiceInstanceCalled(ServiceInstance serviceInstance, boolean isSuccess) {
String key = serviceInstance.getHost() + ":" + serviceInstance.getPort();
metricRegistry.counter(key + CALLING).dec();
if (!isSuccess) {
//不成功則記錄失敗
metricRegistry.meter(key + FAILED).mark();
}
}
/**
* 獲取正在執行的呼叫次數
* @param serviceInstance
* @return
*/
public long getCalling(ServiceInstance serviceInstance) {
String key = serviceInstance.getHost() + ":" + serviceInstance.getPort();
long count = metricRegistry.counter(key + CALLING).getCount();
log.debug("ServiceInstanceMetrics-getCalling: {} -> {}", key, count);
return count;
}
/**
* 獲取最近一分鐘呼叫失敗次數分鐘速率,其實是滑動平均數
* @param serviceInstance
* @return
*/
public double getFailedInRecentOneMin(ServiceInstance serviceInstance) {
String key = serviceInstance.getHost() + ":" + serviceInstance.getPort();
double rate = metricRegistry.meter(key + FAILED).getOneMinuteRate();
log.debug("ServiceInstanceMetrics-getFailedInRecentOneMin: {} -> {}", key, rate);
return rate;
}
}
負載均衡核心程式碼:
private final LoadingCache<Long, Set<String>> calledIpPrefixes = Caffeine.newBuilder()
.expireAfterAccess(3, TimeUnit.MINUTES)
.build(k -> Sets.newConcurrentHashSet());
private final String serviceId;
private final Tracer tracer;
private final ServiceInstanceMetrics serviceInstanceMetrics;
//每次重試,其實都會呼叫這個 choose 方法重新獲取一個例項
@Override
public Mono<Response<ServiceInstance>> choose(Request request) {
Span span = tracer.currentSpan();
return serviceInstanceListSupplier.get().next()
.map(serviceInstances -> {
//保持 span 和呼叫 choose 的 span 一樣
try (Tracer.SpanInScope cleared = tracer.withSpanInScope(span)) {
return getInstanceResponse(serviceInstances);
}
});
}
private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> serviceInstances) {
if (serviceInstances.isEmpty()) {
log.warn("No servers available for service: " + this.serviceId);
return new EmptyResponse();
}
//讀取 spring-cloud-sleuth 的對於當前請求的鏈路追蹤上下文,獲取對應的 traceId
Span currentSpan = tracer.currentSpan();
if (currentSpan == null) {
currentSpan = tracer.newTrace();
}
long l = currentSpan.context().traceId();
return getInstanceResponseByRoundRobin(l, serviceInstances);
}
@VisibleForTesting
public Response<ServiceInstance> getInstanceResponseByRoundRobin(long traceId, List<ServiceInstance> serviceInstances) {
//首先隨機打亂列表中例項的順序
Collections.shuffle(serviceInstances);
//需要先將所有引數快取起來,否則 comparator 會呼叫多次,並且可能在排序過程中引數發生改變(針對例項的請求統計資料一直在併發改變)
Map<ServiceInstance, Integer> used = Maps.newHashMap();
Map<ServiceInstance, Long> callings = Maps.newHashMap();
Map<ServiceInstance, Double> failedInRecentOneMin = Maps.newHashMap();
serviceInstances = serviceInstances.stream().sorted(
Comparator
//之前已經呼叫過的網段,這裡排後面
.<ServiceInstance>comparingInt(serviceInstance -> {
return used.computeIfAbsent(serviceInstance, k -> {
return calledIpPrefixes.get(traceId).stream().anyMatch(prefix -> {
return serviceInstance.getHost().contains(prefix);
}) ? 1 : 0;
});
})
//當前錯誤率最少的
.thenComparingDouble(serviceInstance -> {
return failedInRecentOneMin.computeIfAbsent(serviceInstance, k -> {
double value = serviceInstanceMetrics.getFailedInRecentOneMin(serviceInstance);
//由於使用的是移動平均值(EMA),需要忽略過小的差異(保留兩位小數,不是四捨五入,而是直接捨棄)
return ((int) (value * 100)) / 100.0;
});
})
//當前負載請求最少的
.thenComparingLong(serviceInstance -> {
return callings.computeIfAbsent(serviceInstance, k ->
serviceInstanceMetrics.getCalling(serviceInstance)
);
})
).collect(Collectors.toList());
if (serviceInstances.isEmpty()) {
log.warn("No servers available for service: " + this.serviceId);
return new EmptyResponse();
}
ServiceInstance serviceInstance = serviceInstances.get(0);
//記錄本次返回的網段
calledIpPrefixes.get(traceId).add(serviceInstance.getHost().substring(0, serviceInstance.getHost().lastIndexOf(".")));
//目前記錄這個只為了相容之前的單元測試(呼叫次數測試)
positionCache.get(traceId).getAndIncrement();
return new DefaultResponse(serviceInstance);
}
對於記錄例項資料的快取何時更新,是在 FeignClient 粘合重試,斷路以及執行緒隔離的程式碼中的,這個我們下一節就會看到。
一些組內關於方案設計的取捨 Q&A
1. 為何沒有使用所有微服務共享的快取來儲存呼叫資料,來讓這些資料更加準確?
共享快取的可選方案包括將這些資料記錄放入 Redis,或者是 Apache Ignite 這樣的記憶體網格中。但是有兩個問題:
- 如果資料記錄放入 Redis 這樣的額外儲存,如果 Redis 不可用會導致所有的負載均衡都無法執行。如果放入 Apache Ignite,如果對應的節點下線,那麼對應的負載均衡也無法執行。這些都是不能接受的。
- 假設微服務 A 需要呼叫微服務 B,可能 A 的某個例項呼叫 B 的某個例項有問題,但是 A 的其他例項呼叫 B 的這個例項卻沒有問題,例如當某個可用區與另一個可用區網路擁塞的時候。如果用同一個快取 Key 記錄 A 所有的例項呼叫 B 這個例項的資料,顯然是不準確的。
每個微服務使用本地快取,記錄自己呼叫其他例項的資料,在我們這裡看來,不僅是更容易實現,也是更準確的做法。
2. 採用 EMA 的方式而不是請求視窗的方式統計最近錯誤率
採用請求視窗的方式統計,肯定是最準確的,例如我們統計最近一分鐘的錯誤率,就將最近一分鐘的請求快取起來,讀取的時候,將快取起來的請求資料加在一起取平均數即可。但是這種方式在請求突增的時候,可能會佔用很多很多記憶體來快取這些請求。同時計算錯誤率的時候,隨著快取請求數的增多也會消耗更大量的 CPU 進行計算。這樣做很不值得。
EMA 這種滑動平均值的計算方式,常見於各種效能監控統計場景,例如 JVM 中 TLAB 大小的動態計算,G1 GC Region 大小的伸縮以及其他很多 JVM 需要動態得出合適值的地方,都用這種計算方式。他不用將請求快取起來,而是直接用最新值乘以一個比例之後加上老值乘以 (1 - 這個比例),這個比例一般高於 0.5,表示 EMA 和當前最新值更加相關。
但是 EMA 也帶來另一個問題,我們會發現隨著程式執行小數點位數會非常多,會看到類似於如下的值:0.00000000123, 0.120000001, 0.120000003, 為了忽略過於細緻差異的影響(其實這些影響也來自於很久之前的錯誤請求),我們只保留兩位小數進行排序。
微信搜尋“我的程式設計喵”關注公眾號,每日一刷,輕鬆提升技術,斬獲各種offer: