spring cloud(第二部)服務註冊與發現
阿新 • • 發佈:2019-06-10
- eureka註冊中心的使用
1、啟動eureka server
1.1、引入pom,本示例使用的spring.cloud版本是:Finchley.SR2
1.2、server叢集模式配置,本例,啟動三個ereka叢集,配置如下:<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>${spring.cloud.version}</version> <type>pom</type> <scope>import</scope> </dependency> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-server</artifactId> </dependency> </dependencies>
node1配置:
node2配置:# Eureka 客戶端配置 eureka: client: service-url: defaultZone: http://node2:10002/eureka/,http://node3:10003/eureka/ #本客戶端是否註冊至eureka register-with-eureka: false #本客戶端是否從eureka獲取服務列表 fetch-registry: false instance: # 配置通過主機名方式註冊 hostname: node1 # 配置例項編號 instance-id: ${eureka.instance.hostname}:${server.port}:@project.version@ # 叢集節點之間讀取超時時間。單位:毫秒 server: peer-node-read-timeout-ms: 1000 # 服務埠號 server: port: 10001
node3配置:# Eureka 客戶端配置 eureka: client: service-url: defaultZone: http://node1:10001/eureka/,http://node3:10003/eureka/ #本客戶端是否註冊至eureka register-with-eureka: false #本客戶端是否從eureka獲取服務列表 fetch-registry: false instance: # 配置通過主機名方式註冊 hostname: node2 # 配置例項編號 instance-id: ${eureka.instance.hostname}:${server.port}:@project.version@ # 叢集節點之間讀取超時時間。單位:毫秒 server: peer-node-read-timeout-ms: 1000 # 服務埠號 server: port: 10002
# Eureka 客戶端配置 eureka: client: service-url: defaultZone: http://node1:10001/eureka/,http://node2:10002/eureka/ #本客戶端是否註冊至eureka register-with-eureka: false #本客戶端是否從eureka獲取服務列表 fetch-registry: false instance: # 配置通過主機名方式註冊 hostname: node3 # 配置例項編號 instance-id: ${eureka.instance.hostname}:${server.port}:@project.version@ # 叢集節點之間讀取超時時間。單位:毫秒 server: peer-node-read-timeout-ms: 1000 # 服務埠號 server: port: 10003
補充說明:每一個eureak服務,需要指定出其他的服務節點名字,組成叢集組,經過以上配置我們分別啟動三個服務的main函式,就可以把這組有三個節點node1、node2、node3組成的叢集啟動起來,對外提供註冊服務@EnableEurekaServer @SpringBootApplication public class EurekaServiceApplication { public static void main(String[] args) { SpringApplication.run(EurekaServiceApplication.class, args); } }
2、eureka client使用
所有與eureka server進行互動的端都可以成為客戶端,通過eureka server,我們可以完成,註冊服務,獲取服務列表,下面這個例子是演示如何完成服務的註冊。
2.1、引入pom配置
2.2、註冊中心配置<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency>
2.3、啟動服務註冊spring: application: name: orderService server: port: 8761 eureka: instance: hostname: myapp client: registerWithEureka: true fetchRegistry: true serviceUrl: defaultZone: http://node2:10002/eureka/,http://node3:10003/eureka/,http://node1:10001/eureka/
//@EnableDiscoveryClient //@EnableEurekaClient @SpringBootApplication public class OrderApplication { /** * @MethodName: main * @Description: TODO(這裡用一句話描述這個方法的作用) * @param @param args * @return void * @throws */ public static void main(String[] args) { //DiscoveryClient tt = null; SpringApplication.run(OrderApplication.class, args); } }
補充下,我們在配置檔案中指定了,該客戶端需要向server端進行註冊,並從server端獲取服務列表,所以執行的main函式中,可以不需要做@EnableEurekaClient或@EnableDiscoveryClient的註解!
- spring cloud註冊發現,原始碼解析
首先,我們跟蹤一下注冊功能的入口,做為切入點,觀察eureka整個的註冊過程是如何完成的!
1、EurekaClientAutoConfiguration是一個配置初始化的類,通過這個類,在服務啟動過程中完成註冊@Bean(destroyMethod = "shutdown") @ConditionalOnMissingBean(value = EurekaClient.class, search = SearchStrategy.CURRENT) @org.springframework.cloud.context.config.annotation.RefreshScope @Lazy public EurekaClient eurekaClient(ApplicationInfoManager manager, EurekaClientConfig config, EurekaInstanceConfig instance) { manager.getInfo(); // force initialization return new CloudEurekaClient(manager, config, this.optionalArgs, this.context); }
2、CloudEurekaClient繼承自DiscoveryClient,在DiscoveryClient完成核心的註冊流程,如下
@Inject DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args, Provider<BackupRegistry> backupRegistryProvider) { if (args != null) { this.healthCheckHandlerProvider = args.healthCheckHandlerProvider; this.healthCheckCallbackProvider = args.healthCheckCallbackProvider; this.eventListeners.addAll(args.getEventListeners()); this.preRegistrationHandler = args.preRegistrationHandler; } else { this.healthCheckCallbackProvider = null; this.healthCheckHandlerProvider = null; this.preRegistrationHandler = null; } this.applicationInfoManager = applicationInfoManager; InstanceInfo myInfo = applicationInfoManager.getInfo(); clientConfig = config; staticClientConfig = clientConfig; transportConfig = config.getTransportConfig(); instanceInfo = myInfo; if (myInfo != null) { appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId(); } else { logger.warn("Setting instanceInfo to a passed in null value"); } this.backupRegistryProvider = backupRegistryProvider; this.urlRandomizer = new EndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo); localRegionApps.set(new Applications()); fetchRegistryGeneration = new AtomicLong(0); remoteRegionsToFetch = new AtomicReference<String>(clientConfig.fetchRegistryForRemoteRegions()); remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(",")); if (config.shouldFetchRegistry()) { //如果開啟了從eureka獲取服務列表,建立列表更新的,健康監控 this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L}); } else { this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC; } if (config.shouldRegisterWithEureka()) { //如果開啟了註冊功能,建立一個eureka之間心跳監控 this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L}); } else { this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC; } logger.info("Initializing Eureka in region {}", clientConfig.getRegion()); //沒有開啟獲取註冊列表和服務註冊功能,直接返回 if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) { logger.info("Client configured to neither register nor query for data."); scheduler = null; heartbeatExecutor = null; cacheRefreshExecutor = null; eurekaTransport = null; instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion()); // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance() // to work with DI'd DiscoveryClient DiscoveryManager.getInstance().setDiscoveryClient(this); DiscoveryManager.getInstance().setEurekaClientConfig(config); initTimestampMs = System.currentTimeMillis(); logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}", initTimestampMs, this.getApplications().size()); return; // no need to setup up an network tasks and we are done } //下面是開啟了服務註冊和列表查詢 try { // default size of 2 - 1 each for heartbeat and cacheRefresh //這是一個定時任務,分配了兩個排程任務,一個是給心跳維持的執行緒池,一個是給服務列表重新整理的執行緒池 scheduler = Executors.newScheduledThreadPool(2, new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-%d") .setDaemon(true) .build()); //心跳維持執行緒池,通過執行緒池方式實現了隔離 heartbeatExecutor = new ThreadPoolExecutor( 1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d") .setDaemon(true) .build() ); // use direct handoff //列表重新整理的執行緒池,通過執行緒池重新整理了隔離 cacheRefreshExecutor = new ThreadPoolExecutor( 1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d") .setDaemon(true) .build() ); // use direct handoff eurekaTransport = new EurekaTransport(); scheduleServerEndpointTask(eurekaTransport, args); AzToRegionMapper azToRegionMapper; if (clientConfig.shouldUseDnsForFetchingServiceUrls()) { azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig); } else { azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig); } if (null != remoteRegionsToFetch.get()) { azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(",")); } instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion()); } catch (Throwable e) { throw new RuntimeException("Failed to initialize DiscoveryClient!", e); } if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) { fetchRegistryFromBackup(); } // call and execute the pre registration handler before all background tasks (inc registration) is started if (this.preRegistrationHandler != null) { this.preRegistrationHandler.beforeRegistration(); } if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) { try {//這裡完成啟動的時候註冊,呼叫遠端的eureka server,如:http://node1:10001/eureka/apps/,通過jersey實現rest呼叫,詳細的註冊程式碼,見下方 if (!register() ) { throw new IllegalStateException("Registration error at startup. Invalid server response."); } } catch (Throwable th) { logger.error("Registration error at startup: {}", th.getMessage()); throw new IllegalStateException(th); } } // finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch //完成排程任務的初始化,具體就是本地服務列表重新整理任務、心跳監測任務的初始化 initScheduledTasks(); try { Monitors.registerObject(this); } catch (Throwable e) { logger.warn("Cannot register timers", e); } // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance() // to work with DI'd DiscoveryClient DiscoveryManager.getInstance().setDiscoveryClient(this); DiscoveryManager.getInstance().setEurekaClientConfig(config); initTimestampMs = System.currentTimeMillis(); logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}", initTimestampMs, this.getApplications().size()); }
3、註冊的具體動作:參見AbstractJerseyEurekaHttpClient類的register方法,其實就是傳送的一個http請求,如下:
public EurekaHttpResponse<Void> register(InstanceInfo info) { String urlPath = "apps/" + info.getAppName(); ClientResponse response = null; try { Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder(); addExtraHeaders(resourceBuilder); response = resourceBuilder .header("Accept-Encoding", "gzip") .type(MediaType.APPLICATION_JSON_TYPE) .accept(MediaType.APPLICATION_JSON) .post(ClientResponse.class, info); return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build(); } finally { if (logger.isDebugEnabled()) { logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(), response == null ? "N/A" : response.getStatus()); } if (response != null) { response.close(); } } }
4、心跳維持和快取重新整理,這一步就是在initScheduleTasks完成的
心跳維持的執行緒池,首次註冊完成之後,如何保證這個註冊的’有效性‘,所以需要通過心跳維持的執行緒(HeartbeatThread)做續租,整個過程就是通過前面提到的定時任務,定時去和eureka server進行通訊,告訴它服務還在,服務是有效的!具體的’續租‘邏輯如下:boolean renew() { EurekaHttpResponse<InstanceInfo> httpResponse; try { httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null); logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode()); if (httpResponse.getStatusCode() == 404) { REREGISTER_COUNTER.increment(); logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName()); long timestamp = instanceInfo.setIsDirtyWithTime(); boolean success = register(); if (success) { instanceInfo.unsetIsDirty(timestamp); } return success; } return httpResponse.getStatusCode() == 200; } catch (Throwable e) { logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e); return false; } }
上面提到了服務列表重新整理執行緒,如何保證,本地的服務列表是有效的:即如下場景
4.1、A服務首次啟動,註冊至eureka的server
4.2、eureka client通過CacheRefreshThread獲取服務列表
4.3、A服務宕機,未能及時通過HeartbeatThread向server做’續租‘
4.4、CacheRefreshThread獲取最新的服務列表,最新的服務列表不包含A服務
CacheRefreshThread實現服務列表重新整理邏輯如下:private boolean fetchRegistry(boolean forceFullRegistryFetch) { Stopwatch tracer = FETCH_REGISTRY_TIMER.start(); try { // If the delta is disabled or if it is the first time, get all // applications Applications applications = getApplications(); if (clientConfig.shouldDisableDelta() || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress())) || forceFullRegistryFetch || (applications == null) || (applications.getRegisteredApplications().size() == 0) || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta { logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta()); logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress()); logger.info("Force full registry fetch : {}", forceFullRegistryFetch); logger.info("Application is null : {}", (applications == null)); logger.info("Registered Applications size is zero : {}", (applications.getRegisteredApplications().size() == 0)); logger.info("Application version is -1: {}", (applications.getVersion() == -1)); getAndStoreFullRegistry(); } else { getAndUpdateDelta(applications); } applications.setAppsHashCode(applications.getReconcileHashCode()); logTotalInstances(); } catch (Throwable e) { logger.error(PREFIX + "{} - was unable to refresh its cache! status = {}", appPathIdentifier, e.getMessage(), e); return false; } finally { if (tracer != null) { tracer.stop(); } } // Notify about cache refresh before updating the instance remote status onCacheRefreshed(); // Update remote status based on refreshed data held in the cache updateInstanceRemoteStatus(); // registry was fetched successfully, so return true return true; }