Ribbon負載均衡策略詳解
目前主流的負載方案分為兩種,一種是集中式負載均衡,在消費者和服務提供方中間使用獨立的代理方式進行負載,有硬體的,比如F5,也有軟體的,比如Nginx。
另一種則是客戶端自己做負載均衡,根據自己的請求情況做負載,Ribbon就是屬於客戶端自己做負載的。
一句話介紹那就是Ribbon是Netflix開源的一款用於客戶端負載均衡的工具軟體。GitHub地址:https://github.com/Netflix/ribbon。
Ribbon預設的策略是輪詢,我們可以自定義負載策略來覆蓋預設的,當然也可以通過配置指定使用哪些策略。
Ribbon架構圖
Ribbon作為後端負載均衡器,比
以隨機訪問策略:
1、ribbon配置檔案新增:
service-B.ribbon.NFLoadBalancerRuleClassName=com.netflix.loadbalancer.RandomRule
其中service-B是我註冊到Eureka的serviceID,一共起了3個示例。
2、main類註冊:
@Bean @LoadBalanced RestTemplate restTemplate() { return new RestTemplate(); } @Bean public IRule ribbonRule() { return new RandomRule();//這裡配置策略,和配置檔案對應 }
一定記得加第二個註冊,裡面配具體的策略。
3、Controller:
@RestController public class ConsumerController { @Autowired private RestTemplate restTemplate; @Autowired private LoadBalancerClient loadBalancerClient; @RequestMapping(value = "/add", method = RequestMethod.GET) public String add(@RequestParam Integer a,@RequestParam Integer b) { this.loadBalancerClient.choose("service-B");//隨機訪問策略 return restTemplate.getForEntity("http://service-B/add?a="+a+"&b="+b, String.class).getBody(); } }
客戶端負載均衡Spring Cloud Ribbon
Spring Cloud Ribbon是一個基於HTTP和TCP的客戶端負載均衡工具,基於Netflix Ribbon實現。
目錄
- 客戶端負載均衡
- 原始碼分析
- 負載均衡器
- 負載均衡策略(本文重點)
- 配置詳解
- 自動化配置
負載均衡器
負載均衡器相關內容見上一篇文章
負載均衡策略
AbstractLoadBalancerRule
負載均衡策略的抽象類,在該抽象類中定義了負載均衡器ILoadBalancer物件,該物件能夠在具體實現選擇服務策略時,獲取到一些負載均衡器中維護的資訊作為分配依據,並以此設計一些演算法來實現針對特定場景的高效策略。
package com.netflix.loadbalancer;
import com.netflix.client.IClientConfigAware;
public abstract class AbstractLoadBalancerRule implements IRule, IClientConfigAware {
private ILoadBalancer lb;
@Override
public void setLoadBalancer(ILoadBalancer lb){
this.lb = lb;
}
@Override
public ILoadBalancer getLoadBalancer(){
return lb;
}
}
RandomRule
該策略實現了從服務例項清單中隨機選擇一個服務例項的功能。下面先看一下原始碼:
package com.netflix.loadbalancer;
import java.util.List;
import java.util.Random;
import com.netflix.client.config.IClientConfig;
public class RandomRule extends AbstractLoadBalancerRule {
Random rand;
public RandomRule() {
rand = new Random();
}
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "RCN_REDUNDANT_NULLCHECK_OF_NULL_VALUE")
public Server choose(ILoadBalancer lb, Object key) {
if (lb == null) {
return null;
}
Server server = null;
while (server == null) {
if (Thread.interrupted()) {
return null;
}
List<Server> upList = lb.getReachableServers();
List<Server> allList = lb.getAllServers();
int serverCount = allList.size();
if (serverCount == 0) {
/*
* No servers. End regardless of pass, because subsequent passes
* only get more restrictive.
*/
return null;
}
int index = rand.nextInt(serverCount);
server = upList.get(index);
if (server == null) {
/*
* The only time this should happen is if the server list were
* somehow trimmed. This is a transient condition. Retry after
* yielding.
*/
Thread.yield();
continue;
}
if (server.isAlive()) {
return (server);
}
// Shouldn't actually happen.. but must be transient or a bug.
server = null;
Thread.yield();
}
return server;
}
@Override
public Server choose(Object key) {
return choose(getLoadBalancer(), key);
}
@Override
public void initWithNiwsConfig(IClientConfig clientConfig) {
// TODO Auto-generated method stub
}
}
分析原始碼可以看出,IRule介面中Server choose(Object key)函式的實現委託給了該類中的Server choose(ILoadBalancer lb, Object key)函式,該方法增加了一個負載均衡器引數。從具體的實現可以看出,它會使用負載均衡器來獲得可用例項列表upList和所有的例項列表allList,並且使用rand.nextInt(serverCount)函式來獲取一個隨機數,並將該隨機數作為upList的索引值來返回具體例項。同時,具體的選擇邏輯在一個while (server == null)迴圈之內,而根據選擇邏輯的實現,正常情況下每次都應該選出一個服務例項,如果出現死迴圈獲取不到服務例項時,則很有可能存在併發的Bug。
RoundRobinRule
該策略實現了按照線性輪詢的方式依次選擇每個服務例項的功能。下面看一下原始碼:
package com.netflix.loadbalancer;
import com.netflix.client.config.IClientConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
public class RoundRobinRule extends AbstractLoadBalancerRule {
private AtomicInteger nextServerCyclicCounter;
public Server choose(ILoadBalancer lb, Object key) {
if (lb == null) {
log.warn("no load balancer");
return null;
}
Server server = null;
int count = 0;
while (server == null && count++ < 10) {
List<Server> reachableServers = lb.getReachableServers();
List<Server> allServers = lb.getAllServers();
int upCount = reachableServers.size();
int serverCount = allServers.size();
if ((upCount == 0) || (serverCount == 0)) {
log.warn("No up servers available from load balancer: " + lb);
return null;
}
int nextServerIndex = incrementAndGetModulo(serverCount);
server = allServers.get(nextServerIndex);
if (server == null) {
/* Transient. */
Thread.yield();
continue;
}
if (server.isAlive() && (server.isReadyToServe())) {
return (server);
}
// Next.
server = null;
}
if (count >= 10) {
log.warn("No available alive servers after 10 tries from load balancer: "
+ lb);
}
return server;
}
/**
* Inspired by the implementation of {@link AtomicInteger#incrementAndGet()}.
*
* @param modulo The modulo to bound the value of the counter.
* @return The next value.
*/
private int incrementAndGetModulo(int modulo) {
for (;;) {
int current = nextServerCyclicCounter.get();
int next = (current + 1) % modulo;
if (nextServerCyclicCounter.compareAndSet(current, next))
return next;
}
}
}
RoundRobinRule具體實現和RandomRule類似,但是迴圈條件和從可用列表獲取例項的邏輯不同。迴圈條件中增加了一個count計數變數,該變數會在每次迴圈之後累加,如果迴圈10次還沒獲取到Server,就會結束,並列印一個警告資訊No available alive servers after 10 tries from load balancer:...。
線性輪詢的實現是通過AtomicInteger nextServerCyclicCounter物件實現,每次進行例項選擇時通過呼叫int incrementAndGetModulo(int modulo)方法來實現。
RetryRule
該策略實現了一個具備重試機制的例項選擇功能。從原始碼中可以看出,內部定義了一個IRule物件,預設是RoundRobinRule例項,choose方法中則實現了對內部定義的策略進行反覆嘗試的策略,若期間能夠選擇到具體的服務例項就返回,若選擇不到並且超過設定的嘗試結束時間(maxRetryMillis引數定義的值 + choose方法開始執行的時間戳)就返回null。
package com.netflix.loadbalancer;
import com.netflix.client.config.IClientConfig;
public class RetryRule extends AbstractLoadBalancerRule {
IRule subRule = new RoundRobinRule();
long maxRetryMillis = 500;
/*
* Loop if necessary. Note that the time CAN be exceeded depending on the
* subRule, because we're not spawning additional threads and returning
* early.
*/
public Server choose(ILoadBalancer lb, Object key) {
long requestTime = System.currentTimeMillis();
long deadline = requestTime + maxRetryMillis;
Server answer = null;
answer = subRule.choose(key);
if (((answer == null) || (!answer.isAlive()))
&& (System.currentTimeMillis() < deadline)) {
InterruptTask task = new InterruptTask(deadline
- System.currentTimeMillis());
while (!Thread.interrupted()) {
answer = subRule.choose(key);
if (((answer == null) || (!answer.isAlive()))
&& (System.currentTimeMillis() < deadline)) {
/* pause and retry hoping it's transient */
Thread.yield();
} else {
break;
}
}
task.cancel();
}
if ((answer == null) || (!answer.isAlive())) {
return null;
} else {
return answer;
}
}
}
WeightedResponseTimeRule
該策略是對RoundRobinRule的擴充套件,增加了根據例項的執行情況來計算權重,並根據權重來挑選例項,以達到更優的分配效果。它的實現主要有三個核心內容。
定時任務
WeightedResponseTimeRule策略在初始化的時候會通過serverWeightTimer.schedule(new DynamicServerWeightTask(), 0, serverWeightTaskTimerInterval)啟動一個定時任務,用來為每個服務例項計算權重,該任務預設30s執行一次。
權重計算
在原始碼中我們可以輕鬆找到用於儲存權重的物件private volatile List<Double> accumulatedWeights = new ArrayList<Double>();該List中每個權重值所處的位置對應了負載均衡器維護的服務例項清單中所有例項在清單中的位置。下面看一下權重計算函式maintainWeights的原始碼:
public void maintainWeights() {
ILoadBalancer lb = getLoadBalancer();
if (lb == null) {
return;
}
if (!serverWeightAssignmentInProgress.compareAndSet(false, true)) {
return;
}
try {
logger.info("Weight adjusting job started");
AbstractLoadBalancer nlb = (AbstractLoadBalancer) lb;
LoadBalancerStats stats = nlb.getLoadBalancerStats();
if (stats == null) {
// no statistics, nothing to do
return;
}
double totalResponseTime = 0;
// find maximal 95% response time
for (Server server : nlb.getAllServers()) {
// this will automatically load the stats if not in cache
ServerStats ss = stats.getSingleServerStat(server);
totalResponseTime += ss.getResponseTimeAvg();
}
// weight for each server is (sum of responseTime of all servers - responseTime)
// so that the longer the response time, the less the weight and the less likely to be chosen
Double weightSoFar = 0.0;
// create new list and hot swap the reference
List<Double> finalWeights = new ArrayList<Double>();
for (Server server : nlb.getAllServers()) {
ServerStats ss = stats.getSingleServerStat(server);
double weight = totalResponseTime - ss.getResponseTimeAvg();
weightSoFar += weight;
finalWeights.add(weightSoFar);
}
setWeights(finalWeights);
} catch (Exception e) {
logger.error("Error calculating server weights", e);
} finally {
serverWeightAssignmentInProgress.set(false);
}
}
該方法的實現主要分為兩個步驟:
- 根據LoadBalancerStats中記錄的每個例項的統計資訊,累加所有例項的平均響應時間,得到總平均響應時間totalResponseTime,該值會用於後續的計算。
- 為負載均衡器中維護的例項清單逐個計算權重(從第一個開始),計算規則為weightSoFar + totalResponseTime - 例項的平均響應時間,其中weightSoFar初始化為0,並且每計算好一個權重需要累加到weightSoFar上供下一次計算使用。
通過概算計算出來的權重值只是代表了各例項權重區間的上限。下面圖節選自Spring Cloud 微服務實戰。
例項選擇
下面看一下Server choose(ILoadBalancer lb, Object key)如何選擇Server的
public Server choose(ILoadBalancer lb, Object key) {
if (lb == null) {
return null;
}
Server server = null;
while (server == null) {
// get hold of the current reference in case it is changed from the other thread
List<Double> currentWeights = accumulatedWeights;
if (Thread.interrupted()) {
return null;
}
List<Server> allList = lb.getAllServers();
int serverCount = allList.size();
if (serverCount == 0) {
return null;
}
int serverIndex = 0;
// last one in the list is the sum of all weights
double maxTotalWeight = currentWeights.size() == 0 ? 0 : currentWeights.get(currentWeights.size() - 1);
// No server has been hit yet and total weight is not initialized
// fallback to use round robin
if (maxTotalWeight < 0.001d || serverCount != currentWeights.size()) {
server = super.choose(getLoadBalancer(), key);
if(server == null) {
return server;
}
} else {
// generate a random weight between 0 (inclusive) to maxTotalWeight (exclusive)
double randomWeight = random.nextDouble() * maxTotalWeight;
// pick the server index based on the randomIndex
int n = 0;
for (Double d : currentWeights) {
if (d >= randomWeight) {
serverIndex = n;
break;
} else {
n++;
}
}
server = allList.get(serverIndex);
}
if (server == null) {
/* Transient. */
Thread.yield();
continue;
}
if (server.isAlive()) {
return (server);
}
// Next.
server = null;
}
return server;
}
下面我們看一下原始碼的主要步驟有:
- 首先先獲取accumulatedWeights中最後一個權重,如果該權重小於0.001或者例項的數量不等於權重列表的數量,就採用父類的線性輪詢策略
- 如果滿足條件,就先產生一個[0,最大權重值)區間內的隨機數
- 遍歷權重列表,比較權重值與隨機數的大小,如果權重值大於等於隨機數,就拿當前權重列表的索引值去服務例項列表獲取具體的例項。
細心的可能會發現第一個服務例項的權重區間是雙閉,最後一個服務例項的權重區間是雙開,其他服務例項的區間都是左開右閉。這是因為隨機數的最小值可以為0,所以第一個例項下限是閉區間,同時隨機數的最大值取不到最大權重值,所以最後一個例項的上限是開區間。
ClientConfigEnabledRoundRobinRule
該策略比較特殊,一般不直接使用它。因為他本身並沒有實現特殊的處理邏輯,在他內部定義了一個RoundRobinRule策略,choose函式的實現其實就是採用了RoundRobinRule的線性輪詢機制。
在實際開發中,我們並不會直接使用該策略,而是基於它做高階策略擴充套件。
BestAvailableRule
該策略繼承自ClientConfigEnabledRoundRobinRule,在實現中它注入了負載均衡器的統計物件LoadBalancerStats,同時在choose方法中利用LoadBalancerStats儲存的例項統計資訊來選擇滿足要求的服務例項。
當LoadBalancerStats為空時,會使用RoundRobinRule線性輪詢策略,當有LoadBalancerStats時,會通過遍歷負載均衡器中維護的所有服務例項,會過濾掉故障的例項,並找出併發請求數最小的一個。
該策略的特性是可以選出最空閒的服務例項。
PredicateBasedRule
這是一個抽象策略,它繼承了ClientConfigEnabledRoundRobinRule,從命名中可以猜出這是一個基於Predicate實現的策略,Predicate是Google Guava Collection工具對集合進行過濾的條件介面。
public Server choose(Object key) {
ILoadBalancer lb = getLoadBalancer();
Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);
if (server.isPresent()) {
return server.get();
} else {
return null;
}
}
在該原始碼中,它定義了一個抽象函式getPredicate來獲取AbstractServerPredicate物件的實現,在choose方法中,通過AbstractServerPredicate的chooseRoundRobinAfterFiltering函式來選擇具體的服務例項。從該方法的命名我們可以看出大致的邏輯:首先通過子類中實現的Predicate邏輯來過濾一部分服務例項,然後再以線性輪詢的方式從過濾後的例項清單中選出一個。
在上面choose函式中呼叫的chooseRoundRobinAfterFiltering方法先通過內部定義的getEligibleServers函式來獲取備選的例項清單(實現了過濾),如果返回的清單為空,則用Optional.absent來表示不存在,反之則以線性輪詢的方式從備選清單中獲取一個例項。
下面看一下getEligibleServers方法的原始碼
public List<Server> getEligibleServers(List<Server> servers, Object loadBalancerKey) {
if (loadBalancerKey == null) {
return ImmutableList.copyOf(Iterables.filter(servers, this.getServerOnlyPredicate()));
} else {
List<Server> results = Lists.newArrayList();
for (Server server: servers) {
if (this.apply(new PredicateKey(loadBalancerKey, server))) {
results.add(server);
}
}
return results;
}
}
上述原始碼的大致邏輯是遍歷服務清單,使用this.apply方法來判斷例項是否需要保留,如果是就新增到結果列表中。
實際上,AbstractServerPredicate實現了com.google.common.base.Predicate介面,apply方法是介面中的定義,主要用來實現過濾條件的判斷邏輯,它輸入的引數則是過濾條件需要用到的一些資訊(比如原始碼中的new PredicateKey(loadBalancerKey, server)),傳入了關於例項的統計資訊和負載均衡器的選擇演算法傳遞過來的key。
AbstractServerPredicate沒有apply的實現,所以這裡的chooseRoundRobinAfterFiltering方法只是定義了一個模板策略:先過濾清單,再輪詢選擇。
對於如何過濾,需要在AbstractServerPredicate的子類中實現apply方法來確定具體的過濾策略。
AvailabilityFilteringRule
&emsps;該類繼承自PredicateBasedRule,遵循了先過濾清單,再輪詢選擇的基本處理邏輯,其中過濾條件使用了AvailabilityPredicate,下面看一下AvailabilityPredicate的原始碼:
package com.netflix.loadbalancer;
import javax.annotation.Nullable;
import com.netflix.client.config.IClientConfig;
import com.netflix.config.ChainedDynamicProperty;
import com.netflix.config.DynamicBooleanProperty;
import com.netflix.config.DynamicIntProperty;
import com.netflix.config.DynamicPropertyFactory;
public class AvailabilityPredicate extends AbstractServerPredicate {
@Override
public boolean apply(@Nullable PredicateKey input) {
LoadBalancerStats stats = getLBStats();
if (stats == null) {
return true;
}
return !shouldSkipServer(stats.getSingleServerStat(input.getServer()));
}
private boolean shouldSkipServer(ServerStats stats) {
if ((CIRCUIT_BREAKER_FILTERING.get() && stats.isCircuitBreakerTripped())
|| stats.getActiveRequestsCount() >= activeConnectionsLimit.get()) {
return true;
}
return false;
}
}
從上面的原始碼可以看出,主要過的過濾邏輯都是在boolean shouldSkipServer(ServerStats stats)方法中實現,該方法主要判斷服務例項的兩項內容:
- 是否故障,即斷路由器是否生效已斷開
- 例項的併發請求數大於閥值,預設值2^32 - 1,該配置可以通過引數<clientName>.<nameSpace>.ActiveConnectionsLimit來修改
上面兩項只要滿足一項,apply方法就返回false,代表該服務例項可能存在故障或負載過高,都不滿足就返回true。
在AvailabilityFilteringRule進行例項選擇時做了小小的優化,它並沒有向父類一樣先遍歷所有的節點進行過濾,然後在過濾後的集合中選擇例項。而是先以線性的方式選擇一個例項,接著使用過濾條件來判斷該例項是否滿足要求,若滿足就直接使用該例項,若不滿足要求就再選擇下一個例項,檢查是否滿足要求,這個過程迴圈10次如果還沒有找到合適的服務例項,就採用父類的實現方案。
該策略通過線性輪詢的方式直接嘗試尋找可用且比較空閒的例項來用,優化了每次都要遍歷所有例項的開銷。
ZoneAvoidanceRule
該類也是PredicateBasedRule的子類,它的實現是通過組合過濾條件CompositePredicate,以ZoneAvoidancePredicate為主過濾條件,以AvailabilityPredicate為次過濾條件。
ZoneAvoidanceRule的實現並沒有像AvailabilityFilteringRule重寫choose函式來優化,所以它遵循了先過濾清單再輪詢選擇的基本邏輯。
下面看一下CompositePredicate的原始碼
package com.netflix.loadbalancer;
import java.util.Iterator;
import java.util.List;
import javax.annotation.Nullable;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.collect.Lists;
public class CompositePredicate extends AbstractServerPredicate {
private AbstractServerPredicate delegate;
private List<AbstractServerPredicate> fallbacks = Lists.newArrayList();
private int minimalFilteredServers = 1;
private float minimalFilteredPercentage = 0;
@Override
public boolean apply(@Nullable PredicateKey input) {
return delegate.apply(input);
}
@Override
public List<Server> getEligibleServers(List<Server> servers, Object loadBalancerKey) {
List<Server> result = super.getEligibleServers(servers, loadBalancerKey);
Iterator<AbstractServerPredicate> i = fallbacks.iterator();
while (!(result.size() >= minimalFilteredServers && result.size() > (int) (servers.size() * minimalFilteredPercentage))
&& i.hasNext()) {
AbstractServerPredicate predicate = i.next();
result = predicate.getEligibleServers(servers, loadBalancerKey);
}
return result;
}
}
從原始碼中可以看出,CompositePredicate定義了一個主過濾條件delegate和一組過濾條件列表fallbacks,次過濾條件的過濾順序是按儲存順序執行的。
在獲取結果的getEligibleServers函式中的主要邏輯是:
- 使用主過濾條件對所有例項過濾並返回過濾後的例項清單
- 每次使用次過濾條件過濾前,都要判斷兩個條件,一個是過濾後的例項總數 >= 最小過濾例項數(minimalFilteredServers,預設值為1),另一個是過濾後的例項比例 > 最小過濾百分比(minimalFilteredPercentage,預設為0),只要有一個不符合就不再進行過濾,將當前服務例項列表返回
- 依次使用次過濾條件列表中的過濾條件對主過濾條件的過濾結果進行過濾。