1. 程式人生 > >註冊中心 Eureka 原始碼解析 —— Eureka-Server 叢集同步

註冊中心 Eureka 原始碼解析 —— Eureka-Server 叢集同步

點選上方“芋道原始碼”,選擇“置頂公眾號”

技術文章第一時間送達!

原始碼精品專欄

 

摘要: 原創出處 http://www.iocoder.cn/Eureka/server-cluster/ 「芋道原始碼」歡迎轉載,保留摘要,謝謝!

本文主要基於 Eureka 1.8.X 版本

  • 1. 概述

  • 2. 叢集節點初始化與更新

    • 2.1 叢集節點啟動

    • 2.2 更新叢集節點資訊

    • 2.3 叢集節點

  • 3. 獲取初始註冊資訊

  • 4. 同步註冊資訊

    • 4.1 同步操作型別

    • 4.2 發起 Eureka-Server 同步操作

    • 4.3 接收 Eureka-Server 同步操作

    • 4.4 處理 Eureka-Server 同步結果

1. 概述

本文主要分享 Eureka-Server 叢集同步註冊資訊

Eureka-Server 叢集如下圖:

640
  • Eureka-Server 叢集不區分主從節點或者 Primary & Secondary 節點,所有節點相同角色( 也就是沒有角色 ),完全對等

  • Eureka-Client 可以向任意 Eureka-Client 發起任意讀寫操作,Eureka-Server 將操作複製到另外的 Eureka-Server 以達到最終一致性。注意,Eureka-Server 是選擇了 AP 的元件。

Eureka-Server 可以使用直接配置所有節點的服務地址,或者基於 DNS 配置。推薦閱讀:《Spring Cloud構建微服務架構(六)高可用服務註冊中心》 。

本文主要類在 com.netflix.eureka.cluster 包下。

OK,讓我們開始愉快的遨遊在程式碼的海洋。

推薦 Spring Cloud 書籍

  • 請支援正版。下載盜版,等於主動編寫低階 BUG 。

  • 程式猿DD —— 《Spring Cloud微服務實戰》

  • 周立 —— 《Spring Cloud與Docker微服務架構實戰》

  • 兩書齊買,京東包郵。

推薦 Spring Cloud 視訊

  • Java 微服務實踐 - Spring Boot

  • Java 微服務實踐 - Spring Cloud

  • Java 微服務實踐 - Spring Boot / Spring Cloud

ps :注意,本文提到的同步,準確來說是複製( Replication )

2. 叢集節點初始化與更新

com.netflix.eureka.cluster.PeerEurekaNodes ,Eureka-Server 叢集節點集合 。構造方法如下 :

public class PeerEurekaNodes {

    private static final Logger logger = LoggerFactory.getLogger(PeerEurekaNodes.class);

    /**
     * 應用例項登錄檔
     */

    protected final PeerAwareInstanceRegistry registry;
    /**
     * Eureka-Server 配置
     */

    protected final EurekaServerConfig serverConfig;
    /**
     * Eureka-Client 配置
     */

    protected final EurekaClientConfig clientConfig;
    /**
     * Eureka-Server 編解碼
     */

    protected final ServerCodecs serverCodecs;
    /**
     * 應用例項資訊管理器
     */

    private final ApplicationInfoManager applicationInfoManager;

    /**
     * Eureka-Server 叢集節點陣列
     */

    private volatile List<PeerEurekaNode> peerEurekaNodes = Collections.emptyList();
    /**
     * Eureka-Server 服務地址陣列
     */

    private volatile Set<String> peerEurekaNodeUrls = Collections.emptySet();

    /**
     * 定時任務服務
     */

    private ScheduledExecutorService taskExecutor;

    @Inject
    public PeerEurekaNodes(
            PeerAwareInstanceRegistry registry,
            EurekaServerConfig serverConfig,
            EurekaClientConfig clientConfig,
            ServerCodecs serverCodecs,
            ApplicationInfoManager applicationInfoManager)
 
{
        this.registry = registry;
        this.serverConfig = serverConfig;
        this.clientConfig = clientConfig;
        this.serverCodecs = serverCodecs;
        this.applicationInfoManager = applicationInfoManager;
    }
}
  • peerEurekaNodespeerEurekaNodeUrlstaskExecutor 屬性,在構造方法中未設定和初始化,而是在 PeerEurekaNodes#start() 方法,設定和初始化,下文我們會解析這個方法。

  • Eureka-Server 在初始化時,呼叫 EurekaBootStrap#getPeerEurekaNodes(…) 方法,建立 PeerEurekaNodes ,點選 連結 檢視該方法的實現。

2.1 叢集節點啟動

呼叫 PeerEurekaNodes#start() 方法,叢集節點啟動,主要完成兩個邏輯:

  • 初始化叢集節點資訊

  • 初始化固定週期( 預設:10 分鐘,可配置 )更新叢集節點資訊的任務

程式碼如下:

  1public void start() {
  2:     // 建立 定時任務服務
  3:     taskExecutor = Executors.newSingleThreadScheduledExecutor(
  4:             new ThreadFactory() {
  5:                 @Override
  6:                 public Thread newThread(Runnable r) {
  7:                     Thread thread = new Thread(r, "Eureka-PeerNodesUpdater");
  8:                     thread.setDaemon(true);
  9:                     return thread;
 10:                 }
 11:             }
 12:     );
 13:     try {
 14:         // 初始化 叢集節點資訊
 15:         updatePeerEurekaNodes(resolvePeerUrls());
 16:         // 初始化 初始化固定週期更新叢集節點資訊的任務
 17:         Runnable peersUpdateTask = new Runnable() {
 18:             @Override
 19:             public void run() {
 20:                 try {
 21:                     updatePeerEurekaNodes(resolvePeerUrls());
 22:                 } catch (Throwable e) {
 23:                     logger.error("Cannot update the replica Nodes", e);
 24:                 }
 25
 26:             }
 27:         };
 28:         taskExecutor.scheduleWithFixedDelay(
 29:                 peersUpdateTask,
 30:                 serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
 31:                 serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
 32:                 TimeUnit.MILLISECONDS
 33:         );
 34:     } catch (Exception e) {
 35:         throw new IllegalStateException(e);
 36:     }
 37:     // 列印 叢集節點資訊
 38:     for (PeerEurekaNode node : peerEurekaNodes) {
 39:         logger.info("Replica node URL:  " + node.getServiceUrl());
 40:     }
 41: }
  • 第 15 行 && 第 21 行 :呼叫 #updatePeerEurekaNodes() 方法,更新叢集節點資訊。

2.2 更新叢集節點資訊

呼叫 #resolvePeerUrls() 方法,獲得 Eureka-Server 叢集服務地址陣列,程式碼如下:

  1protected List<String> resolvePeerUrls() {
  2:     // 獲得 Eureka-Server 叢集服務地址陣列
  3:     InstanceInfo myInfo = applicationInfoManager.getInfo();
  4:     String zone = InstanceInfo.getZone(clientConfig.getAvailabilityZones(clientConfig.getRegion()), myInfo);
  5:     List<String> replicaUrls = EndpointUtils.getDiscoveryServiceUrls(clientConfig, zone, new EndpointUtils.InstanceInfoBasedUrlRandomizer(myInfo));
  6
  7:     // 移除自己(避免向自己同步)
  8:     int idx = 0;
  9:     while (idx < replicaUrls.size()) {
 10:         if (isThisMyUrl(replicaUrls.get(idx))) {
 11:             replicaUrls.remove(idx);
 12:         } else {
 13:             idx++;
 14:         }
 15:     }
 16:     return replicaUrls;
 17: }
  • 第 2 至 5 行 :獲得 Eureka-Server 叢集服務地址陣列。EndpointUtils#getDiscoveryServiceUrls(…) 方法,邏輯與 《Eureka 原始碼解析 —— EndPoint 與 解析器》「3.4 ConfigClusterResolver」 基本類似。EndpointUtils 正在逐步,猜測未來這裡會替換。

  • 第 7 至 15 行 :移除自身節點,避免向自己同步。

呼叫 #updatePeerEurekaNodes() 方法,更新叢集節點資訊,主要完成兩部分邏輯:

  • 新增新增的叢集節點

  • 關閉刪除的叢集節點

程式碼如下:

  1protected void updatePeerEurekaNodes(List<String> newPeerUrls) {
  2:     if (newPeerUrls.isEmpty()) {
  3:         logger.warn("The replica size seems to be empty. Check the route 53 DNS Registry");
  4:         return;
  5:     }
  6
  7:     // 計算 新增的叢集節點地址
  8:     Set<String> toShutdown = new HashSet<>(peerEurekaNodeUrls);
  9:     toShutdown.removeAll(newPeerUrls);
 10
 11:     // 計算 刪除的叢集節點地址
 12:     Set<String> toAdd = new HashSet<>(newPeerUrls);
 13:     toAdd.removeAll(peerEurekaNodeUrls);
 14
 15:     if (toShutdown.isEmpty() && toAdd.isEmpty()) { // No change
 16:         return;
 17:     }
 18
 19:     // 關閉刪除的叢集節點
 20:     // Remove peers no long available
 21:     List<PeerEurekaNode> newNodeList = new ArrayList<>(peerEurekaNodes);
 22:     if (!toShutdown.isEmpty()) {
 23:         logger.info("Removing no longer available peer nodes {}", toShutdown);
 24:         int i = 0;
 25:         while (i < newNodeList.size()) {
 26:             PeerEurekaNode eurekaNode = newNodeList.get(i);
 27:             if (toShutdown.contains(eurekaNode.getServiceUrl())) {
 28:                 newNodeList.remove(i);
 29:                 eurekaNode.shutDown(); // 關閉
 30:             } else {
 31:                 i++;
 32:             }
 33:         }
 34:     }
 35
 36:     // 新增新增的叢集節點
 37:     // Add new peers
 38:     if (!toAdd.isEmpty()) {
 39:         logger.info("Adding new peer nodes {}", toAdd);
 40:         for (String peerUrl : toAdd) {
 41:             newNodeList.add(createPeerEurekaNode(peerUrl));
 42:         }
 43:     }
 44
 45:     // 賦值
 46:     this.peerEurekaNodes = newNodeList;
 47:     this.peerEurekaNodeUrls = new HashSet<>(newPeerUrls);
 48: }
  • 第 7 至 9 行 :計算新增的叢集節點地址。

  • 第 11 至 13 行 :計算刪除的叢集節點地址。

  • 第 19 至 34 行 :關閉刪除的叢集節點。

  • 第 36 至 43 行 :新增新增的叢集節點。呼叫 #createPeerEurekaNode(peerUrl) 方法,建立叢集節點,程式碼如下:

      1protected PeerEurekaNode createPeerEurekaNode(String peerEurekaNodeUrl) {
      2:     HttpReplicationClient replicationClient = JerseyReplicationClient.createReplicationClient(serverConfig, serverCodecs, peerEurekaNodeUrl);
      3:     String targetHost = hostFromUrl(peerEurekaNodeUrl);
      4:     if (targetHost == null) {
      5:         targetHost = "host";
      6:     }
      7:     return new PeerEurekaNode(registry, targetHost, peerEurekaNodeUrl, replicationClient, serverConfig);
      8: }
    • 第 2 行 :建立 Eureka-Server 叢集通訊客戶端,在 《Eureka 原始碼解析 —— 網路通訊》「4.2 JerseyReplicationClient」 有詳細解析。

    • 第 7 行 :建立 PeerEurekaNode ,在 「2.3 PeerEurekaNode」 有詳細解析。

2.3 叢集節點

com.netflix.eureka.cluster.PeerEurekaNode ,單個叢集節點。

點選 連結 檢視構造方法

  • 第 129 行 :建立 ReplicationTaskProcessor 。在 「4.1.2 同步操作任務處理器」 詳細解析

  • 第 131 至 140 行 :建立批量任務分發器,在 《Eureka 原始碼解析 —— 任務批處理》 有詳細解析。

  • 第 142 至 151 行 :建立單任務分發器,用於 Eureka-Server 向亞馬遜 AWS 的 ASG ( Autoscaling Group ) 同步狀態。暫時跳過。

3. 獲取初始註冊資訊

Eureka-Server 啟動時,呼叫 PeerAwareInstanceRegistryImpl#syncUp() 方法,從叢集的一個 Eureka-Server 節點獲取初始註冊資訊,程式碼如下:

  1@Override
  2public 

相關推薦

註冊中心 Eureka 原始碼解析 —— Eureka-Server 叢集同步

點選上方“芋道原始碼”,選擇“置頂公眾號”技術文章第一時間送達!原始碼精品專欄 摘要: 原創出處

註冊中心 Eureka 原始碼解析 —— Eureka-Client 初始化(三)之 EurekaClient

本文主要基於 Eureka 1.8.X 版本1. 概述2. EurekaClient2.1 Lo

註冊中心 Eureka 原始碼解析 —— Eureka-Client 初始化(一)之 EurekaInstanceConfig

本文主要基於 Eureka 1.8.X 版本1. 概述2. EurekaInstanceConf

Eureka 原始碼解析 —— 應用例項註冊發現(六)之全量獲取

// AbstractJerseyEurekaHttpClient.java@Overridepublic EurekaHttpResponse<Applications> getApplications(String... regions) { return getApplicationsI

Eureka 原始碼解析 —— 應用例項註冊發現(一)之註冊

// DiscoveryClient.javaboolean register() throws Throwable { logger.info(PREFIX + appPathIdentifier + ": registering service..."); EurekaHttpResponse&l

深入理解Eureka Server叢集同步(十)

叢集啟動同步 protected void initEurekaServerContext() throws Exception {   // ....省略N多程式碼 // 同步資訊   int registryCount = this.registr

Eureka原始碼解析與配置

    Eureka只要分為2部分,一個server,一個是client(包含生產者和消費者)。    Eureka client: ①從@EnableEurekaClient註解開始看    ②檢視@EnableDiscoveryClient    ③EnableDisco

Spring Cloud系列(三):Eureka原始碼解析之服務端

一、自動裝配   1、根據自動裝配原理(詳見:Spring Boot系列(二):Spring Boot自動裝配原理解析),找到spring-cloud-starter-netflix-eureka-server.jar的spring.factories,檢視spring.factories如下:   2、進

registerReceiver 動態註冊與 sendBroadcast 原始碼解析

    廣播的註冊分為動態註冊和靜態註冊,靜態註冊主要在開機後PackageManagerService 利用 AndroidManifest 掃描 安裝的apk 獲取AndroidManifest內註冊的 廣播 所以 忽略 靜態註冊。今天主要介紹 動態廣播的註冊。  

Redis原始碼解析:28叢集(四)手動故障轉移、從節點遷移

一:手動故障轉移          Redis叢集支援手動故障轉移。也就是向從節點發送”CLUSTER  FAILOVER”命令,使其在主節點未下線的情況下,發起故障轉移流程,升級為新的主節點,而原來的主節點降級為從節點。          為了不丟失資料,向從節點發送”C

Redis原始碼解析:27叢集(三)主從複製、故障轉移

一:主從複製          在叢集中,為了保證叢集的健壯性,通常設定一部分叢集節點為主節點,另一部分叢集節點為這些主節點的從節點。一般情況下,需要保證每個主節點至少有一個從節點。          叢集初始化時,每個叢集節點都是以獨立的主節點角色而存在的,通過向叢集節點

配置中心 Apollo 原始碼解析 —— 客戶端 API 配置(三)之 ConfigFile

������關注微信公眾號:【芋道原始碼】有福利: 1. RocketMQ / MyCAT / Sharding-JDBC 所有原始碼分析文章列表 2. RocketMQ / MyCAT / Sharding-JDBC 中文註釋

Redis原始碼解析:25叢集(一)握手、心跳訊息以及下線檢測

         Redis叢集是Redis提供的分散式資料庫方案,通過分片來進行資料共享,並提供複製和故障轉移功能。 一:初始化 1:資料結構 在原始碼中,通過server.cluster記錄整個叢集當前的狀態,比如叢集中的所有節點;叢集目前的狀態,比如是上線還是下線;

剖析nsq訊息佇列(二) 去中心原始碼解析

在上一篇帖子剖析nsq訊息佇列(一) 簡介及去中心化實現原理中,我介紹了nsq的兩種使用方式,一種是直接連線,還有一種是通過nslookup來實現去中心化的方式使用,並大概說了一下實現原理,沒有什麼難理解的東西,這篇帖子我把nsq實現去中心化的原始碼和其中的業物邏輯展示給大家看一下。 nsqd和nsqlook

Java併發程式設計高階技術-高效能併發框架原始碼解析與實戰目前同步

第1章 課程介紹(Java併發程式設計進階課程) 什麼是Disruptor?它一個高效能的非同步處理框架,號稱“單執行緒每秒可處理600W個訂單”的神器,本課程目標:徹底精通一個如此優秀的開源框架,面試秒殺面試官。本章會帶領小夥伴們先了解課程大綱與重點,然後模擬千萬,億級

Codis原始碼解析——sentinel的重同步(2)

Topom.ha.monitor本身相當於一個上帝視角的sentinel。它本身並不是一個實際的sentinel伺服器,但是它負責收集各個sentinel的監控資訊,並對叢集作出反饋。這一講我們就來看看Topom.ha.monitor。這一篇的原始碼也有助於大家

搭建 Spring Cloud Eureka Server 高可用註冊中心叢集

什麼是註冊中心 Eureka Server 在微服務中承擔的角色是服務註冊中心,也是最最基礎的核心設施之一。從“Eureka”單詞的含義**“我找到了!我發現了!”可以看出,其實 Eureka 就是用來實現服務註冊、服務發現的工具**,和 dubbo 這類的分散式服務治理框架類似。各

Spring Cloud Eureka 2 (Eureka Server搭建服務註冊中心)

class XML bsp gist client intellij 嘗試 ati register 工具:IntelliJ IDEA 2017.1.2 x64、maven3.3.9 打開IDE file===>new===>project next

註冊中心 Eureka 源碼解析 —— 調試環境搭建(含源碼)

Java 架構 依賴工具GradleJDKIntelliJ IDEA 源碼拉取https://github.com/Netflix/eureka.git使用 IntelliJ IDEA 從 Fork 出來的倉庫拉取代碼。拉取完成後,Gradle 會下載依賴包,可能會花費一些時間,耐心等待下。 本文基

SpringCloud 註冊與發現 之 註冊中心高可用叢集 Eureka

SpringCloud 註冊與發現 之 註冊中心高可用叢集 Eureka    1、註冊中心的存在 服務提供者啟動會註冊到註冊中心,並會定期發起心跳通知,告知註冊中心“我還活著,別把我幹掉”。 服務消費者到註冊中心訂閱服務,並拉取呼叫的服務列表到本地