1. 程式人生 > 其它 >SpringCloud Ribbon 原始碼初讀

SpringCloud Ribbon 原始碼初讀

目錄

SpringCloud Ribbon 原始碼初讀

不想看原始碼,看的頭疼。沒辦法啊,得看啊。

為什麼要寫這篇文章

起因是看見微服務專案中服務間呼叫都常用RestTemplate,在程式啟動類中提供這樣一個Bean,然後在其上加上@LoadBalanced 就能實現服務的負載均衡呼叫,想究其咋實現的。網上也看了眾多文章和書籍,然後自己用IDE研究了下,故有此文章(看別人的不如自己親自動手試下看下,這樣記憶深刻,順便記錄下)。如有雷同純屬巧合。

從註解@LoadBalanced入手

看原始碼

此註解上的元註解,我就不在此贅述。

首先看這個註解的註釋,英文註釋,不怕,直接翻譯一下。

大致意思是:這個註解標記RestTemplate ,用 LoadBalancerClient 這個類來配置。

現在進入LoadBalancerClient 看一下,它也是一個介面。

不截圖了 截圖 註釋太多圖片太大,我簡化一下。

public interface LoadBalancerClient extends ServiceInstanceChooser {

	//根據傳入的serviceId使用負載均衡器挑選服務例項執行請求內容(有一定的負載均衡策略預設是輪詢)
	<t> T execute(String serviceId, LoadBalancerRequest<t> request) throws IOException;

	/**
	* 上面方法的過載,多了一個引數ServiceInstance,
    * 這是一個介面表示服務發現系統中的服務例項物件
    **/
	<t> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<t> request) throws IOException;

	/**
	* 看方法名的意思,重建URI的方法,建造一個host:port 形式的URI。
	* 為保證高可用,一般一個服務有多個例項,一般我們呼叫服務時,使用服務名進行呼叫,
	* 避免使用具體IP地址
	* 即 將http://127.0.0.1:8099/service/ser 變為 http://someone-service/service/ser
	* 第一個引數 instance 是ServiceInstance型別,它的例項包含Host、port、uri等屬性;
	* 第二個引數 original 只是使用 服務名 為host 的uri
	* 返回引數 URI 返回的是 將兩個入參經過一定邏輯之後,拼接出的具體Host:Port 形式的請求地址
	**/
	URI reconstructURI(ServiceInstance instance, URI original);
}

別忘了,這個介面還繼承了一個ServiceInstanceChooser ,看一下。

/**
 *
 * 由使用負載均衡器來選擇傳送請求到的伺服器的類實現
 */
public interface ServiceInstanceChooser {

    /**
     * 根據傳入serviceId 
     * 為指定的服務從LoadBalancer中選擇服務例項
     */
    ServiceInstance choose(String serviceId);
}

看一下 LoadBalancerClient 所在的包

哎喲,不錯。看到個好東西 LoadBalancerAutoConfiguration

,從名字可以看出這是負載均衡器的自動配置類,開啟簡單看下。

@Configuration
@ConditionalOnClass(RestTemplate.class)
@ConditionalOnBean(LoadBalancerClient.class)
public class LoadBalancerAutoConfiguration {

	@LoadBalanced
	@Autowired(required = false)
	private List<resttemplate> restTemplates = Collections.emptyList();

	@Bean
	public SmartInitializingSingleton loadBalancedRestTemplateInitializerDeprecated(
			final ObjectProvider<list<resttemplatecustomizer>> restTemplateCustomizers) {
		return () -> restTemplateCustomizers.ifAvailable(customizers -> {
            for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) {
                for (RestTemplateCustomizer customizer : customizers) {
                    customizer.customize(restTemplate);
                }
            }
        });
	}

	@Bean
	@ConditionalOnMissingBean
	public LoadBalancerRequestFactory loadBalancerRequestFactory(
			LoadBalancerClient loadBalancerClient) {
		return new LoadBalancerRequestFactory(loadBalancerClient, transformers);
	}

	@Configuration
	@ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate")
	static class LoadBalancerInterceptorConfig {
		@Bean
		public LoadBalancerInterceptor ribbonInterceptor(
				LoadBalancerClient loadBalancerClient,
				LoadBalancerRequestFactory requestFactory) {
			return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
		}

		@Bean
		@ConditionalOnMissingBean
		public RestTemplateCustomizer restTemplateCustomizer(
				final LoadBalancerInterceptor loadBalancerInterceptor) {
			return restTemplate -> {
                List<clienthttprequestinterceptor> list = new ArrayList<>(
                        restTemplate.getInterceptors());
                list.add(loadBalancerInterceptor);
                restTemplate.setInterceptors(list);
            };
		}
	}
    
    //...剩餘程式碼未涉及未列出
}

有些程式碼已省略,因為根據@Condition系列註解這些省略的Bean不會載入。

根據LoadBalancerAutoConfiguration 類最上面的註解可以知道這是個配置類。

其成功載入的條件為下面兩個

  • @ConditionalOnClass(RestTemplate.class)

    RestTemplate 類必須存在於當前工程環境。

  • @ConditionalOnBean(LoadBalancerClient.class)

    在Spring上下文所載入的Bean中必須存在LoadBalancerClient(interface) 的實現Bean物件

此類主要載入的物件為:

  • 最上面有一個restTemplates ,被 @LoadBalanced 所註釋,其初始化了一個List<resttemplate> ,通過呼叫RestTemplateCustomizer 的例項來給所需客戶端 RestTemplate 增加 LoadBalancerInterceptor 攔截器。

  • LoadBalancerInterceptor ,看名字知道,其是一個攔截器,主要用於攔截客戶端所發的請求,來實現客戶端的負載均衡。

  • RestTemplateCustomizer,看程式碼邏輯,用於給RestTemplate 新增攔截器即 LoadBalancerInterceptor

LoadBalancerInterceptor如何為RestTemplate的負載均衡賦能

看程式碼

public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor {

	private LoadBalancerClient loadBalancer;
	private LoadBalancerRequestFactory requestFactory;

	public LoadBalancerInterceptor(LoadBalancerClient loadBalancer, LoadBalancerRequestFactory requestFactory) {
		this.loadBalancer = loadBalancer;
		this.requestFactory = requestFactory;
	}

	public LoadBalancerInterceptor(LoadBalancerClient loadBalancer) {
		// for backwards compatibility
		this(loadBalancer, new LoadBalancerRequestFactory(loadBalancer));
	}

	@Override
	public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
			final ClientHttpRequestExecution execution) throws IOException {
		final URI originalUri = request.getURI();
		String serviceName = originalUri.getHost();
		Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);
		return this.loadBalancer.execute(serviceName, requestFactory.createRequest(request, body, execution));
	}
}

通過它的原始碼和之前自動配置類的程式碼,明顯能發現其在攔截器中需要注入 LoadBalancerClient 的實現。當一個被@LoadBalanced 註解修飾的 RestTemplate 物件向外發起HTTP請求,會被 LoadBalancerInterceptor 的這個 intercept() 方法攔截。由於我們在使用RestTemplate 時採用服務名呼叫,所以在 final URI originalUri = request.getURI();
String serviceName = originalUri.getHost(); 這兩個語句執行後,能拿到服務名,其後呼叫了execute() 去根據服務名來選擇例項發起實際的請求。

分析至此,LoadBalancerClient 其實還只是一個介面,看下其實現,順著可以找到 RibbonLoadBalancerClient ,在包 org.springframework.cloud.netflix.ribbon 下,簡單看下它所覆寫的 execute 方法。

public class RibbonLoadBalancerClient implements LoadBalancerClient {
    
    @Override
	public <t> T execute(String serviceId, LoadBalancerRequest<t> request) throws IOException {
		ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
		Server server = getServer(loadBalancer);
		if (server == null) {
			throw new IllegalStateException("No instances available for " + serviceId);
		}
		RibbonServer ribbonServer = new RibbonServer(serviceId, server, isSecure(server,
				serviceId), serverIntrospector(serviceId).getMetadata(server));

		return execute(serviceId, ribbonServer, request);
	}
    
    @Override
	public <t> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<t> request) throws IOException {
		Server server = null;
		if(serviceInstance instanceof RibbonServer) {
			server = ((RibbonServer)serviceInstance).getServer();
		}
		if (server == null) {
			throw new IllegalStateException("No instances available for " + serviceId);
		}

		RibbonLoadBalancerContext context = this.clientFactory
				.getLoadBalancerContext(serviceId);
		RibbonStatsRecorder statsRecorder = new RibbonStatsRecorder(context, server);

		try {
			T returnVal = request.apply(serviceInstance);
			statsRecorder.recordStats(returnVal);
			return returnVal;
		}
		// catch IOException and rethrow so RestTemplate behaves correctly
		catch (IOException ex) {
			statsRecorder.recordStats(ex);
			throw ex;
		}
		catch (Exception ex) {
			statsRecorder.recordStats(ex);
			ReflectionUtils.rethrowRuntimeException(ex);
		}
		return null;
	}
}

第一個execute方法中,首先通過 getServer 方法根據傳入的serviceId去獲取具體的服務例項,下面是getServer方法:

可以看到這裡獲取具體例項時並沒有用到我們上面說的 LoadBalancerClient 介面中的choose 函式,而是使用了Netflix Ribbon 自身的ILoadBalancer 介面中定義的 chooseServer 函式。下面來看ILoadBalancer 介面。

public interface ILoadBalancer {
    /**
    * 向負載均衡器中維護的例項列表增加服務例項
    **/
    void addServers(List<server> newServers);

    /**
    * 通過某種策略,從負載均衡器中選擇一個具體例項
    **/
    Server chooseServer(Object key);

    /**
    * 通知並標識負載均衡器中的某個例項已停止服務,不然負載均衡器在下一個獲取服務例項列表週期前會認為此例項是可用的。
    **/
    void markServerDown(Server server);

    /** 
    * 獲取伺服器的當前列表。
    * 如果為true,則應該只返回活動的和可用的伺服器
    * 2016-01-20 已被標記為過時 推薦使用下面兩個方法
    **/
    @Deprecated
    List<server> getServerList(boolean availableOnly);

    /**
    * 獲取只有啟動並可訪問的服務。
    **/
    List<server> getReachableServers();

    /**
    * 獲取所有服務,包括可正常訪問和不可正常訪問的。
    **/
    List<server> getAllServers();
}

該介面中涉及的 Server 物件,定義為一個典型的服務端節點物件,該類儲存一些元資料,包括但不限於host、port、zone以及一些部署資訊。

檢視ILoadBalancer 的實現,發現下面的類

  • AbstractLoadBalancer 包含大多數負載均衡實現所需的特性。
  • BaseLoadBalancer 實現了基礎的負載均衡。
  • DynamicServerListLoadBalancer 能夠使用動態源獲取候選伺服器列表。也就是說,伺服器列表可能在執行時被更改。
    它還包含一些工具,其中伺服器列表可以通過一個過濾條件來過濾出不滿足所需條件的伺服器。
  • NoOpLoadBalancer: 繼承了AbstractLoadBalancer ,但其實沒有LB
  • ZoneAwareLoadBalancer 負載均衡器,在選擇伺服器時可以避免一個區域作為一個整體。

Srping Cloud Ribbon 預設的負載均衡策略

public class RibbonClientConfiguration {	
	@Bean
	@ConditionalOnMissingBean
	public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
			ServerList<server> serverList, ServerListFilter<server> serverListFilter,
			IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
		if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {
			return this.propertiesFactory.get(ILoadBalancer.class, config, name);
		}
		return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
				serverListFilter, serverListUpdater);
	}
}

通過檢視RibbonClientConfiguration 類,可以知道其整合時 預設採用了 ZoneAwareLoadBalancer

繼續回到 RibbonLoadBalancerClientexecute 執行,在通過 ZoneAwareLoadBalancerchooseServer 函式 獲取了負載均衡策略分配到的服例項的物件Server之後,將其內容包裝成RibbonServer 物件,此物件除了儲存服務例項的資訊之外,還包含了服務名serviceId、是否使用https,一個metadata(Map型別)等。然後使用該物件再回調LoaderbalancerInterceptor 請求攔截器中LoadBalancerRequestapply(ServiceInstance instance) 函式,向一個實際的具體服務例項發起請求,從而實現一開始服務名為host 的URI請求到host:post 形式的實際訪問地址的轉換。

apply 入參ServiceInstance 介面物件是對服務例項的抽象定義。如下,

該介面抽象了服務治理體系下每個服務例項需要的一些基本資訊,包括serviceId、host、port等等。

而上面提到的 RibbonServer 物件就是ServiceInstance 的具體實現,正如我上面所說它除了包含Server物件之外,還儲存了服務名、是否使用https,以及一個metadata的Map集合。如下:

繼續深入,apply 函式在得到ServiceInstance 物件後如何通過LoadBalancerClient 介面中的 reconstructURI 來拼出具體請求地址。深入apply之後可以得到如下類:

public class AsyncLoadBalancerInterceptor implements AsyncClientHttpRequestInterceptor {

	private LoadBalancerClient loadBalancer;

	public AsyncLoadBalancerInterceptor(LoadBalancerClient loadBalancer) {
		this.loadBalancer = loadBalancer;
	}

	@Override
	public ListenableFuture<clienthttpresponse> intercept(final HttpRequest request, final byte[] body,
			final AsyncClientHttpRequestExecution execution) throws IOException {
		final URI originalUri = request.getURI();
		String serviceName = originalUri.getHost();
		return this.loadBalancer.execute(serviceName,
				new LoadBalancerRequest<listenablefuture<clienthttpresponse>>() {
					@Override
					public ListenableFuture<clienthttpresponse> apply(final ServiceInstance instance)
							throws Exception {
						HttpRequest serviceRequest = new ServiceRequestWrapper(request,
								instance, loadBalancer);
						return execution.executeAsync(serviceRequest, body);
					}

				});
	}
}

在這個實現中,可以看到一個關鍵物件 ServiceRequestWrapper 該物件繼承了HttpRequestWrapper 並重寫了getURI函式,重寫後的getURI 通過呼叫LoadBalancerClient 介面的 reconstructURI 函式重建URI來訪問目標地址。具體如下:

public class ServiceRequestWrapper extends HttpRequestWrapper {
	private final ServiceInstance instance;
	private final LoadBalancerClient loadBalancer;

	public ServiceRequestWrapper(HttpRequest request, ServiceInstance instance,
								 LoadBalancerClient loadBalancer) {
		super(request);
		this.instance = instance;
		this.loadBalancer = loadBalancer;
	}

	@Override
	public URI getURI() {
		URI uri = this.loadBalancer.reconstructURI(
				this.instance, getRequest().getURI());
		return uri;
	}
}

AsyncLoadBalancerInterceptor 中,AsyncClientHttpRequestExecution(它是介面) 的例項具體執行execution.executeAsync(serviceRequest, body);時會呼叫InterceptingAsyncClientHttpRequest(org.springframework.http.client),下private class AsyncRequestExecution類中的executeAsync方法,如下:

private class AsyncRequestExecution implements AsyncClientHttpRequestExecution {

		private Iterator<asyncclienthttprequestinterceptor> iterator;

		public AsyncRequestExecution() {
			this.iterator = interceptors.iterator();
		}

		@Override
		public ListenableFuture<clienthttpresponse> executeAsync(HttpRequest request, byte[] body)
				throws IOException {

			if (this.iterator.hasNext()) {
				AsyncClientHttpRequestInterceptor interceptor = this.iterator.next();
				return interceptor.intercept(request, body, this);
			}
			else {
				URI uri = request.getURI();
				HttpMethod method = request.getMethod();
				HttpHeaders headers = request.getHeaders();

				Assert.state(method != null, "No standard HTTP method");
				AsyncClientHttpRequest delegate = requestFactory.createAsyncRequest(uri, method);
				delegate.getHeaders().putAll(headers);
				if (body.length > 0) {
					StreamUtils.copy(body, delegate.getBody());
				}

				return delegate.executeAsync();
			}
		}
	}

可以看到,在建立請求時 requestFactory.createAsyncRequest(uri, method); ,這裡的uri由request.getURI()得到,這裡會呼叫之前所說的 ServiceRequestWrapper 的getURI,此時,它就會使用RibbonLoadBalancerClient中實現的reconstructURI來組織具體請求的服務例項地址。如下

public URI reconstructURI(ServiceInstance instance, URI original) {
		Assert.notNull(instance, "instance can not be null");
		String serviceId = instance.getServiceId();
		RibbonLoadBalancerContext context = this.clientFactory
				.getLoadBalancerContext(serviceId);

		URI uri;
		Server server;
		if (instance instanceof RibbonServer) {
			RibbonServer ribbonServer = (RibbonServer) instance;
			server = ribbonServer.getServer();
			uri = updateToSecureConnectionIfNeeded(original, ribbonServer);
		} else {
			server = new Server(instance.getScheme(), instance.getHost(), instance.getPort());
			IClientConfig clientConfig = clientFactory.getClientConfig(serviceId);
			ServerIntrospector serverIntrospector = serverIntrospector(serviceId);
			uri = updateToSecureConnectionIfNeeded(original, clientConfig,
					serverIntrospector, server);
		}
		return context.reconstructURIWithServer(server, uri);
	}

reconstructURI函式中我們可以看到,它通過ServiceInstance物件的serviceId,從SpringClientFactory(clientFactory)物件中獲取對應serviceId的負載均衡器上下文RibbonLoadBalancerContext

說明:

  • SpringClientFactory :看名字知道是個工廠類,其實它主要用於建立客戶端、負載均衡器和客戶端配置例項。它為每個不同名稱客戶端建立一個Spring ApplicationContext,並從中提取它所需要的bean。即為每個不同名的Ribbon 客戶端生成不同Spring上下文。
  • RibbonLoadBalancerContext:是LoadBalancerContext的子類 ,LoadBalancerContext類用於儲存一些被負載均衡器使用的上下文內容和API操作,reconstructURIWithServer就在其中。

第一眼看見 reconstructURIWithServer 感覺和 reconstructURI 挺相似從名字可以看出來。只是前者入參需要傳入一個Server物件(Netflix所定義的),後者需要的是ServiceInstance 物件(springcloud所定義的)。所以RibbonLoadBalancerClient 中覆寫reconstructURI 的方法做了轉換,使用ServiceInstance的host、port new了一個Server物件以便給reconstructURIWithServer 使用。reconstructURIWithServer 的具體實現中,他從Server物件中獲取host、port,然後根據以服務名為host的URI 物件original中獲取其他請求資訊,將兩者內容拼接整合,最後構成要訪問的服務例項的具體地址。下方是具體邏輯:

public URI reconstructURIWithServer(Server server, URI original) {
        String host = server.getHost();
        int port = server.getPort();
        String scheme = server.getScheme();
        
        if (host.equals(original.getHost()) 
                && port == original.getPort()
                && scheme == original.getScheme()) {
            return original;
        }
        if (scheme == null) {
            scheme = original.getScheme();
        }
        if (scheme == null) {
            scheme = deriveSchemeAndPortFromPartialUri(original).first();
        }

        try {
            StringBuilder sb = new StringBuilder();
            sb.append(scheme).append("://");
            if (!Strings.isNullOrEmpty(original.getRawUserInfo())) {
                sb.append(original.getRawUserInfo()).append("@");
            }
            sb.append(host);
            if (port >= 0) {
                sb.append(":").append(port);
            }
            sb.append(original.getRawPath());
            if (!Strings.isNullOrEmpty(original.getRawQuery())) {
                sb.append("?").append(original.getRawQuery());
            }
            if (!Strings.isNullOrEmpty(original.getRawFragment())) {
                sb.append("#").append(original.getRawFragment());
            }
            URI newURI = new URI(sb.toString());
            return newURI;            
        } catch (URISyntaxException e) {
            throw new RuntimeException(e);
        }
    }

至此,SpringCloud Ribbon實現客戶端負載均衡的基本操作已經瞭解。包括LoadBalancerIntercept 攔截器對RestTemplate請求進行攔截、URI的轉換、均衡器策略的選擇(預設使用了ZoneAwareLoadBalancer,ILoadBalancer的實現)。


參考書籍或文章:

SpringCloud 微服務實戰