Ribbon 負載均衡器 LoadBalancer 原始碼解析
前言
Ribbon 負載均衡器
Ribbon 負載均衡器流程圖
Ribbon 負載均衡器實現原理
判斷服務的可用性
根據負載均衡策略 IRule 來選擇一個可用的服務
初始化獲取所有服務列表
前言
什麼是負載均衡?簡單來說一個應用,後臺有多個服務來支撐,即利用多臺伺服器提供單一服務,當某個服務掛了或者負載過高的時候,負載均衡器能夠選擇其他的服務來處理請求,用來提高應用的高可用性和高併發行;此外,當用戶請求過來的時候,負載均衡器會將請求後臺內網伺服器,內網伺服器將請求的響應返回給負載平衡器,負載平衡器再將響應傳送到使用者,這樣可以阻止了使用者直接訪問後臺伺服器,使得伺服器更加安全。
可參考 維基百科-負載均衡
Ribbon 負載均衡器
Ribbon 的負載均衡器是通過 LoadBalancerClient 來實現的,在應用啟動的時候,LoadBalancerClient 預設會從 EurekaClient 獲取服務列表,並將服務註冊列表快取在本地,當呼叫 LoadBalancerClient 的 choose() 方法的時候, 根據負載均衡策略 IRule 來選擇一個可用的服務,從而實現負載均衡。
當然,LoadBalancerClient 也可以不從 EurekaClient 中獲取服務列表,這是需要自己維護一個服務註冊列表資訊,具體操作如下:
ribbon:
eureka:
enabled: false
stores:
ribbon:
listOfServers: baidu.com, google.com
Ribbon 負載均衡器流程圖
主要流程:
1. 當應用啟動的時候,ILoadBalancer 從 EurekaClient 獲取服務列表
2. 然後每 10 秒 向 EurekaClient 傳送一次心跳檢測,如果註冊列表發生了變化,則更新獲取重新獲取
3. LoadBalancerClient 呼叫 choose() 方法來選擇服務的時候,會呼叫 ILoadBalancer 的 chooseServer() 來獲取一個可以的服務,
4. 在 ILoadBalancer 進行獲取服務的時候,會根據負載均衡策略 IRule 來進行選擇
5. 返回可用的服務
Ribbon 負載均衡器實現原理
下面就來看看每個類的實現原理
RibbonLoadBalancerClient
RibbonLoadBalancerClient 它是 Ribbon 負載均衡實現的一個重要的類,最終的負載均衡的請求處理都由它來執行,先來看下它的類圖:
它實現了 LoadBalancerClient 介面,而 LoadBalancerClient 介面實現了 ServiceInstanceChooser 介面:
ServiceInstanceChooser
該介面用來從負載均衡器中獲取一個可用的服務,只有一個方法:
public interface ServiceInstanceChooser {
/**
* @param serviceId:服務ID
* @return 可用的服務例項
*/
ServiceInstance choose(String serviceId);
}
LoadBalancerClient
表示負載均衡的客戶端,是一個介面,繼承了 ServiceInstanceChooser 介面 ,共有三個方法:
public interface LoadBalancerClient extends ServiceInstanceChooser {
/**
* 執行請求
* @param serviceId :用於查詢 LoadBalancer的服務ID
* @param request:允許實現執行前後操作
*/
<T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException;
/**
* 執行請求
* @param serviceId :用於查詢 LoadBalancer的服務ID
* @param serviceInstance :執行請求的服務
* @param request:允許實現執行前後操作
*/
<T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException;
/**
* 建立具有真實主機和埠的正確URI,有些系統使用帶有邏輯服務名的URL作為主機,呼叫該方法將會使用 host:port 來替換邏輯服務名
* @param instance :用於重建URI的服務例項
* @param original :具有邏輯服務名的URL
* @return A reconstructed URI.
*/
URI reconstructURI(ServiceInstance instance, URI original);
}
RibbonLoadBalancerClient 實現如下:
主要看下從 ServiceInstanceChooser,LoadBalancerClient 中實現的介面方法
public class RibbonLoadBalancerClient implements LoadBalancerClient {
// 工廠:主要用來建立客戶端,建立負載均衡器,進行客戶端配置等
// 對於每一個客戶端名稱都會建立一個Spring ApplicationContext,可以從中獲取需要的bean
private SpringClientFactory clientFactory;
protected ILoadBalancer getLoadBalancer(String serviceId) {
return this.clientFactory.getLoadBalancer(serviceId);
}
..................
}
public class SpringClientFactory extends NamedContextFactory<RibbonClientSpecification> {
// 獲取客戶端
public <C extends IClient<?, ?>> C getClient(String name, Class<C> clientClass) {
return getInstance(name, clientClass);
}
// 獲取負載均衡器
public ILoadBalancer getLoadBalancer(String name) {
return getInstance(name, ILoadBalancer.class);
}
//獲取客戶端配置
public IClientConfig getClientConfig(String name) {
return getInstance(name, IClientConfig.class);
}
// 獲取 RibbonLoadBalancerContext
public RibbonLoadBalancerContext getLoadBalancerContext(String serviceId) {
return getInstance(serviceId, RibbonLoadBalancerContext.class);
}
// 獲取對應的bean
public <T> T getInstance(String name, Class<T> type) {
AnnotationConfigApplicationContext context = getContext(name);
.....
return context.getBean(type);
.....
}
}
choose() 方法
該方法主要用來獲取一個可用的服務例項
public ServiceInstance choose(String serviceId, Object hint) {
Server server = getServer(getLoadBalancer(serviceId), hint);
if (server == null) {
return null;
}
// RibbonServer 實現了 ServiceInstance
return new RibbonServer(serviceId, server, isSecure(server, serviceId),
serverIntrospector(serviceId).getMetadata(server));
}
// 根據服務ID獲取負載均衡器,會呼叫 SpringClientFactory 的方法進行獲取
protected ILoadBalancer getLoadBalancer(String serviceId) {
return this.clientFactory.getLoadBalancer(serviceId);
}
// 根據負載均衡器來獲取可用的服務
protected Server getServer(ILoadBalancer loadBalancer, Object hint) {
if (loadBalancer == null) {
return null;
}
return loadBalancer.chooseServer(hint != null ? hint : "default");
}
最後會呼叫 ILoadBalancer.chooseServer 來獲取可用服務,後面再來說 ILoadBalancer 。
execute() 方法
該方法執行請求
public <T> T execute(String serviceId, ServiceInstance serviceInstance,
LoadBalancerRequest<T> request) throws IOException {
Server server = null;
if (serviceInstance instanceof RibbonServer) {
server = ((RibbonServer) serviceInstance).getServer();
}
RibbonLoadBalancerContext context = this.clientFactory.getLoadBalancerContext(serviceId);
// 狀態記錄器,記錄著服務的狀態
RibbonStatsRecorder statsRecorder = new RibbonStatsRecorder(context, server);
...........
T returnVal = request.apply(serviceInstance);
statsRecorder.recordStats(returnVal);
return returnVal;
...........
}
// apply 方法呼叫如下,最終返回 ClientHttpResponse
public ListenableFuture<ClientHttpResponse> intercept(final HttpRequest request,
final byte[] body, final AsyncClientHttpRequestExecution execution)
throws IOException {
final URI originalUri = request.getURI();
String serviceName = originalUri.getHost();
return this.loadBalancer.execute(serviceName,
new LoadBalancerRequest<ListenableFuture<ClientHttpResponse>>() {
@Override
public ListenableFuture<ClientHttpResponse> apply(
final ServiceInstance instance) throws Exception {
HttpRequest serviceRequest = new ServiceRequestWrapper(request,
instance, AsyncLoadBalancerInterceptor.this.loadBalancer);
return execution.executeAsync(serviceRequest, body);
}
});
}
以上就是負載均衡器流程圖左邊部分的原理,接下來看下右邊的部分
ILoadBalancer
通過上面的分析,負載均衡器獲取一個可用的服務,最終會呼叫 ILoadBalancer 的 chooseServer 方法,下面就來看下 ILoadBalancer 的實現原理
首先來看下 ILoadBalancer 的整體類圖:
在上面的類圖中,主要的邏輯實在 BaseLoadBalancer 中實現,而 DynamicServerListLoadBalancer 主要提供動態獲取服務列表的能力。
ILoadBalancer
首先來看下 ILoadBalancer,它表示一個負載均衡器介面,
public interface ILoadBalancer {
// 新增服務
public void addServers(List<Server> newServers);
//獲取服務
public Server chooseServer(Object key);
//標記某個服務下線
public void markServerDown(Server server);
//獲取狀態為UP的所有可用服務列表
public List<Server> getReachableServers();
//獲取所有服務列表,包括可用的和不可用的
public List<Server> getAllServers();
}
AbstractLoadBalancer
實現 ILoadBalancer 介面,提供一些預設實現
public abstract class AbstractLoadBalancer implements ILoadBalancer {
public enum ServerGroup{ALL, STATUS_UP, STATUS_NOT_UP}
public Server chooseServer() {
return chooseServer(null);
}
public abstract List<Server> getServerList(ServerGroup serverGroup);
// 獲取狀態
public abstract LoadBalancerStats getLoadBalancerStats();
}
IClientConfigAware
客戶端配置
public interface IClientConfigAware {
public abstract void initWithNiwsConfig(IClientConfig clientConfig);
}
BaseLoadBalancer
負載均衡器的主要實現邏輯,在該類中,會根據負載均衡策略 IRule 來獲取可用的服務,會通過 IPing 來檢測服務的可用性,此外,該類中從 EurkaClient 獲取到服務列表後,會在該類中儲存下來,會維護所有的服務列表和可用的服務列表。
首先來看下它的一些屬性,然後再來看每個對應的方法
public class BaseLoadBalancer extends AbstractLoadBalancer implements
PrimeConnections.PrimeConnectionListener, IClientConfigAware {
// 預設的負載均衡策略:輪詢選擇服務例項
private final static IRule DEFAULT_RULE = new RoundRobinRule();
protected IRule rule = DEFAULT_RULE;
// 預設 ping 的策略,會呼叫 IPing 來檢測服務是否可用
private final static SerialPingStrategy DEFAULT_PING_STRATEGY = new SerialPingStrategy();
protected IPingStrategy pingStrategy = DEFAULT_PING_STRATEGY;
protected IPing ping = null;
// 所有服務列表
protected volatile List<Server> allServerList = Collections.synchronizedList(new ArrayList<Server>());
// 狀態為 up 的服務列表
protected volatile List<Server> upServerList = Collections.synchronizedList(new ArrayList<Server>());
// 鎖
protected ReadWriteLock allServerLock = new ReentrantReadWriteLock();
protected ReadWriteLock upServerLock = new ReentrantReadWriteLock();
// 定時任務,去 ping 服務是否可用
protected Timer lbTimer = null;
// ping 的時間間隔,10秒
protected int pingIntervalSeconds = 10;
// ping 的最大次數
protected int maxTotalPingTimeSeconds = 5;
// 負載均衡器的狀態
protected LoadBalancerStats lbStats;
// 客戶端配置
private IClientConfig config;
// 服務列表變化監聽器
private List<ServerListChangeListener> changeListeners = new CopyOnWriteArrayList<ServerListChangeListener>();
// 服務狀態變化監聽器
private List<ServerStatusChangeListener> serverStatusListeners = new CopyOnWriteArrayList<ServerStatusChangeListener>();
// 構造方法,使用預設的配置來建立負載均衡器,還有其他過載的構造方法,可用根據需要來建立負載均衡器
public BaseLoadBalancer() {
this.name = DEFAULT_NAME;
this.ping = null;
setRule(DEFAULT_RULE);
setupPingTask();
lbStats = new LoadBalancerStats(DEFAULT_NAME);
}
.....................
}
在上面的屬性中,Ribbon 提供了一些預設的配置:
IClientConfig 表示客戶端的配置,實現類為 DefaultClientConfigImpl,在該類中配置了預設的值,:
public class DefaultClientConfigImpl implements IClientConfig {
// ping 的預設策略 DummyPing
public static final String DEFAULT_NFLOADBALANCER_PING_CLASSNAME = "com.netflix.loadbalancer.DummyPing"; // DummyPing.class.getName();
public static final String DEFAULT_NFLOADBALANCER_RULE_CLASSNAME = "com.netflix.loadbalancer.AvailabilityFilteringRule";
public static final String DEFAULT_NFLOADBALANCER_CLASSNAME = "com.netflix.loadbalancer.ZoneAwareLoadBalancer";
public static final int DEFAULT_MAX_TOTAL_TIME_TO_PRIME_CONNECTIONS = 30000;
public static final int DEFAULT_MAX_RETRIES_PER_SERVER_PRIME_CONNECTION = 9;
.............................................
}
IRule 表示 負載均衡策略,即如何去選擇服務例項,預設為 RoundRobinRule,即通過輪詢的方式選擇服務。Ribbon 預設提供的有 7 種。
IPing 表示檢測服務是否可用策略,Ribbon 也提供了很多策略,共有 5 種,預設為 DummyPing
關於 IRule 和 IPing 的策略,後面會專門進行研究。
在 BaseLoadBalancer 中,除了提供一個無參的構造方法(使用的是預設的配置)外,還提供了很多過載的構造方法,下面來看下根據客戶端的配置來建立BaseLoadBalancer :
// 根據客戶端配置來建立 BaseLoadBalancer
public BaseLoadBalancer(IClientConfig config) {
initWithNiwsConfig(config);
}
@Override
public void initWithNiwsConfig(IClientConfig clientConfig) {
// 負載均衡策略
String ruleClassName = (String) clientConfig.getProperty(CommonClientConfigKey.NFLoadBalancerRuleClassName);
// ping 策略
String pingClassName = (String) clientConfig.getProperty(CommonClientConfigKey.NFLoadBalancerPingClassName);
IRule rule = (IRule) ClientFactory.instantiateInstanceWithClientConfig(ruleClassName, clientConfig);
IPing ping = (IPing) ClientFactory.instantiateInstanceWithClientConfig(pingClassName, clientConfig);
// 狀態
LoadBalancerStats stats = createLoadBalancerStatsFromConfig(clientConfig);
// 初始化配置
initWithConfig(clientConfig, rule, ping, stats);
}
void initWithConfig(IClientConfig clientConfig, IRule rule, IPing ping, LoadBalancerStats stats) {
this.config = clientConfig;
String clientName = clientConfig.getClientName();
this.name = clientName;
// ping 的週期
int pingIntervalTime = Integer.parseInt(clientConfig.getProperty(CommonClientConfigKey.NFLoadBalancerPingInterval,Integer.parseInt("30")));
// 最大 ping 的次數
int maxTotalPingTime = Integer.parseInt(clientConfig.getProperty(CommonClientConfigKey.NFLoadBalancerMaxTotalPingTime,Integer.parseInt("2")));
setPingInterval(pingIntervalTime);
setMaxTotalPingTime(maxTotalPingTime);
setRule(rule);
setPing(ping);
setLoadBalancerStats(stats);
rule.setLoadBalancer(this);
if (ping instanceof AbstractLoadBalancerPing) {
((AbstractLoadBalancerPing) ping).setLoadBalancer(this);
}
.................
// 註冊監控/可忽略
init();
}
在上面的構造方法中,可用根據客戶端配置的資訊來建立一個BaseLoadBalancer,如客戶端可以配置負載均衡策略,ping的策略,ping的時間間隔和最大次數等。
判斷服務的可用性
在 Ribbon 中,負載均衡器多久才去更新獲取服務列表呢?在 BaseLoadBalancer 類中,有一個 setupPingTask 方法,在該方法內部,會建立 PingTask 定時任務去檢測服務的可用性,而 PingTask 又會建立 Pinger 物件,在 Pinger 物件的 runPinger() 方法中,會根據ping策略即 pingerStrategy 的 pingServers(ping, allServer) 來獲取服務的可用性,主要邏輯如下:
void setupPingTask() {
if (canSkipPing()) {
return;
}
// 如果已經有了定時任務,則取消
if (lbTimer != null) {
lbTimer.cancel();
}
// 第二個引數為true,表示它是一個deamon執行緒
lbTimer = new ShutdownEnabledTimer("NFLoadBalancer-PingTimer-" + name, true);
// 建立 PingTask, 它繼承於 TimerTask,定時執行 run 方法
lbTimer.schedule(new PingTask(), 0, pingIntervalSeconds * 1000);
......
}
class PingTask extends TimerTask {
public void run() {
// 預設 pingStrategy = new SerialPingStrategy()
new Pinger(pingStrategy).runPinger();
}
}
public void runPinger() throws Exception {
// 如果正在ping,則返回
if (!pingInProgress.compareAndSet(false, true)) {
return; // Ping in progress - nothing to do
}
// 所有的服務,包括不可用的服務
Server[] allServers = null;
boolean[] results = null;
Lock allLock = null;
Lock upLock = null;
try {
allLock = allServerLock.readLock();
allLock.lock();
allServers = allServerList.toArray(new Server[allServerList.size()]);
allLock.unlock();
// 所有服務的數量
int numCandidates = allServers.length;
// 所有服務ping的結果
results = pingerStrategy.pingServers(ping, allServers);
// 狀態可用的服務列表
final List<Server> newUpList = new ArrayList<Server>();
// 狀態改變的服務列表
final List<Server> changedServers = new ArrayList<Server>();
for (int i = 0; i < numCandidates; i++) {
// 最新的狀態
boolean isAlive = results[i];
Server svr = allServers[i];
// 老的狀態
boolean oldIsAlive = svr.isAlive();
// 更新狀態
svr.setAlive(isAlive);
// 如果狀態改變了,則放到集合中,會進行重新拉取
if (oldIsAlive != isAlive) {
changedServers.add(svr);
}
// 狀態可用的服務
if (isAlive) {
newUpList.add(svr);
}
}
upLock = upServerLock.writeLock();
upLock.lock();
upServerList = newUpList;
upLock.unlock();
// 變態改變監聽器
notifyServerStatusChangeListener(changedServers);
} finally {
// ping 完成
pingInProgress.set(false);
}
}
// 檢測服務的狀態
@Override
public boolean[] pingServers(IPing ping, Server[] servers) {
int numCandidates = servers.length;
boolean[] results = new boolean[numCandidates];
for (int i = 0; i < numCandidates; i++) {
results[i] = false;
if (ping != null) {
results[i] = ping.isAlive(servers[i]);
}
}
return results;
}
在上面的邏輯中,Ribbon 每10秒向 EurekaClient 傳送 ping 來判斷服務的可用性,如果服務的可用性發生了改變或服務的數量和之前的不一致,則會更新或重新拉取服務。有了這些服務之後,會根據負載均衡策略 IRule 來選擇一個可用的服務。
根據負載均衡策略 IRule 來選擇一個可用的服務
在前文說到 Ribbon 客戶端 RibbonLoadBalancerClient 選擇服務的時候,最終會呼叫 ILoadBalancer.chooseServer 來選擇服務,接下來就來看下這個方法:
public Server chooseServer(Object key) {
.......
//rule= new RoundRobinRule()
return rule.choose(key);
....
}
關於 Ribbon 的負載均衡策略 IRule, Ribbon 提供了 7 種,後面再來分析,現在只需要知道通過 IRule 來選擇服務就可以了。
初始化獲取所有服務列表
在上面的分析中,Ribbon 會每10秒定時的去檢測服務的可用性,如果服務狀態發生了變化則重新獲取,之後,再根據負載均衡策略 IRule 來選擇一個可用的服務;但是,在初始化的時候,是從哪裡獲取服務列表呢?下面就來分析這個問題
BaseLoadBalancer 有個子類 DynamicServerListLoadBalancer,它具有使用動態源獲取伺服器列表的功能。即伺服器列表在執行時可能會更改。此外,還可以通過條件來過濾掉不符合所需條件的服務。
public class DynamicServerListLoadBalancer<T extends Server> extends BaseLoadBalancer {
// 是否正在進行服務列表的更新
protected AtomicBoolean serverListUpdateInProgress = new AtomicBoolean(false);
// 服務列表
volatile ServerList<T> serverListImpl;
// 服務過濾器
volatile ServerListFilter<T> filter;
}
1. 初始化時獲取所有服務
在 DynamicServerListLoadBalancer 中,有個 restOfInit 方法,在初始化時進行呼叫,在該方法中,會從 Eureka 客戶端中拉取所有的服務列表:
void restOfInit(IClientConfig clientConfig) {
.............
updateListOfServers();
........
}
public void updateListOfServers() {
List<T> servers = new ArrayList<T>();
if (serverListImpl != null) {
// 獲取所有服務列表
servers = serverListImpl.getUpdatedListOfServers();
// 根據條件過濾服務
if (filter != null) {
servers = filter.getFilteredListOfServers(servers);
}
}
updateAllServerList(servers);
}
protected void updateAllServerList(List<T> ls) {
if (serverListUpdateInProgress.compareAndSet(false, true)) {
try {
for (T s : ls) {
s.setAlive(true); // 狀態設定為可用
}
setServersList(ls);
super.forceQuickPing(); // 強制檢測服務狀態
} finally {
serverListUpdateInProgress.set(false);
}
}
}
獲取所有服務列表 servers = serverListImpl.getUpdatedListOfServers(); 最終會呼叫 DiscoveryEnabledNIWSServerList 的方法:
servers = serverListImpl.getUpdatedListOfServers();
public List<DiscoveryEnabledServer> getUpdatedListOfServers(){
return obtainServersViaDiscovery();
}
private List<DiscoveryEnabledServer> obtainServersViaDiscovery() {
List<DiscoveryEnabledServer> serverList = new ArrayList<DiscoveryEnabledServer>();
........
// 通過 eurekaClient 來獲取註冊的服務列表
EurekaClient eurekaClient = eurekaClientProvider.get();
if (vipAddresses!=null){
for (String vipAddress : vipAddresses.split(",")) {
List<InstanceInfo> listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, isSecure, targetRegion);
for (InstanceInfo ii : listOfInstanceInfo) {
if (ii.getStatus().equals(InstanceStatus.UP)) {
.....
DiscoveryEnabledServer des = createServer(ii, isSecure, shouldUseIpAddr);
serverList.add(des);
}
}
......
}
}
return serverList;
}
通過上面方法的分析,Ribbon 最終會通過 EurekaClient 來獲取服務列表的,而 EurekaClient 的實現類是 DiscoveryClient,而在 Eureka 中,DiscoveryClient 類具有服務的註冊,發現,續約,獲取服務列表等功能。
此外,該類中還可以通過過濾器來獲取不符合條件的服務。
以上就是 Ribbon 負載均衡器的一個實現原理。最後再來看下流程圖,加深印象:
關於負載均衡策略 IRule 和 Ping 策略,下篇文章進行研究。