深入理解Eureka Server叢集同步(十)
叢集啟動同步
protected void initEurekaServerContext() throws Exception { // ....省略N多程式碼 // 同步資訊 int registryCount = this.registry.syncUp(); // ....省略N多程式碼 }
void initEurekaServerContext() throws Exception { // ....省略N多程式碼 // 同步資訊 int registryCount = this.registry.syncUp(); // ....省略N多程式碼}
網上很多文章說是呼叫syncUp這個方法去其他Eureka Server節點複製註冊資訊,這個說法不是很準確, 在這個地方,SyncUp()這個方法並不會去其他Eureka Server節點複製資訊,而是從本地記憶體裡面獲取註冊資訊, 看原始碼就知道了。
public int syncUp() {
// Copy entire entry from neighboring DS node
// 獲取到的註冊節點數量
int count = 0;
// 如果count==0 , 那麼預設重試5次(前提是開啟了register-with-eureka = true,否則為0)
for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
if (i > 0) {
try {
// 從第二次開始,每次預設沉睡30秒
Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
} catch (InterruptedException e) {
logger .warn("Interrupted during registry transfer..");
break;
}
}
// 從本地記憶體裡面獲取註冊例項資訊
Applications apps = eurekaClient.getApplications();
for (Application app : apps.getRegisteredApplications()) {
for (InstanceInfo instance : app.getInstances()) {
try {
// 判斷是否可以註冊
if (isRegisterable(instance)) {
// 註冊到當前Eureka Server裡面
register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
count++;
}
} catch (Throwable t) {
logger.error("During DS init copy", t);
}
}
}
}
return count;
}
public int syncUp() {
// Copy entire entry from neighboring DS node
// 獲取到的註冊節點數量
int count = 0;
// 如果count==0 , 那麼預設重試5次(前提是開啟了register-with-eureka = true,否則為0)
for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
if (i > 0) {
try {
// 從第二次開始,每次預設沉睡30秒
Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
} catch (InterruptedException e) {
logger.warn("Interrupted during registry transfer..");
break;
}
}
// 從本地記憶體裡面獲取註冊例項資訊
Applications apps = eurekaClient.getApplications();
for (Application app : apps.getRegisteredApplications()) {
for (InstanceInfo instance : app.getInstances()) {
try {
// 判斷是否可以註冊
if (isRegisterable(instance)) {
// 註冊到當前Eureka Server裡面
register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
count++;
}
} catch (Throwable t) {
logger.error("During DS init copy", t);
}
}
}
}
return count;
}
引數說明:
regirstrySyncRetries : 當eureka伺服器啟動時嘗試去獲取叢集裡其他伺服器上的註冊資訊的次數,預設為5,
只有當 eureka.client.register-with-eureka = true 的時候才會是5,如果是false ,則為0
registrySyncRetryWaitMs : 當eureka伺服器啟動時獲取其他伺服器的註冊資訊失敗時,會再次嘗試獲取,期間需要等待的時間,預設為30 * 1000毫秒
count : 獲取到的註冊例項數量,如果為0 則根據重試次數進行重試,每次重試前沉默 30秒
PS: 在之前的文章中 https://blog.csdn.net/u012394095/article/details/80882684 ,講過Eureka Client啟動的時候預設會自動從Eureka Server獲取註冊資訊, 要想Eureka Server在啟動的時候可以同步其他叢集節點的註冊資訊,那麼必須開啟客戶端配置
eureka.client.register-with-eureka = true ## 是否作為一個Eureka Client 註冊到Eureka Server上去
eureka.client.fetch-registry = true ## 是否需要從Eureka Server上拉取註冊資訊到本地。
eureka.client.register-with-eureka = true ## 是否作為一個Eureka Client 註冊到Eureka Server上去
eureka.client.fetch-registry = true ## 是否需要從Eureka Server上拉取註冊資訊到本地。
只有開啟了上面兩個配置,那麼叢集節點在啟動的時候,會初始化Eureka Client端的配置 ,會從其他Eureka Server拉取註冊資訊到本地,同時
在初始化Eureka Server的時候,會從本地記憶體裡面讀取 註冊資訊,自動註冊到本身的服務上
叢集同步型別
public enum Action {
Heartbeat, Register, Cancel, StatusUpdate, DeleteStatusOverride;
private com.netflix.servo.monitor.Timer timer = Monitors.newTimer(this.name());
public com.netflix.servo.monitor.Timer getTimer() {
return this.timer;
}
}
Heartbeat : 心跳續約
Register : 註冊
Cancel : 下線
StatusUpdate : 新增覆蓋狀態
DeleteStatusOverride : 刪除覆蓋狀態
enum Action {
Heartbeat, Register, Cancel, StatusUpdate, DeleteStatusOverride;
private com.netflix.servo.monitor.Timer timer = Monitors.newTimer(this.name());
public com.netflix.servo.monitor.Timer getTimer() {
return this.timer;
}
}
Heartbeat : 心跳續約
Register : 註冊
Cancel : 下線
StatusUpdate : 新增覆蓋狀態
DeleteStatusOverride : 刪除覆蓋狀態
發起同步
這裡以註冊的程式碼為例
@Override
public void register(final InstanceInfo info, final boolean isReplication) {
int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
leaseDuration = info.getLeaseInfo().getDurationInSecs();
}
// 發起註冊
super.register(info, leaseDuration, isReplication);
// 註冊完成後,在這裡發起同步,同步型別為Register
replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}
private void replicateToPeers(Action action, String appName, String id,
InstanceInfo info /* optional */,
InstanceStatus newStatus /* optional */, boolean isReplication) {
Stopwatch tracer = action.getTimer().start();
try {
// 判斷是否是叢集同步請求,如果是,則記錄最後一分鐘的同步次數
if (isReplication) {
numberOfReplicationsLastMin.increment();
}
// If it is a replication already, do not replicate again as this will create a poison replication
// 叢集節點為空,或者這是一個Eureka Server 同步請求,直接return
if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
return;
}
// 迴圈相鄰的Eureka Server Node, 分別發起請求同步
for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
// 判斷是否是自身的URL,過濾掉
if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
continue;
}
// 發起同步請求
replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
}
} finally {
tracer.stop();
}
}
@Override
public void register(final InstanceInfo info, final boolean isReplication) {
int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
leaseDuration = info.getLeaseInfo().getDurationInSecs();
}
// 發起註冊
super.register(info, leaseDuration, isReplication);
// 註冊完成後,在這裡發起同步,同步型別為Register
replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}
private void replicateToPeers(Action action, String appName, String id,
InstanceInfo info /* optional */,
InstanceStatus newStatus /* optional */, boolean isReplication) {
Stopwatch tracer = action.getTimer().start();
try {
// 判斷是否是叢集同步請求,如果是,則記錄最後一分鐘的同步次數
if (isReplication) {
numberOfReplicationsLastMin.increment();
}
// If it is a replication already, do not replicate again as this will create a poison replication
// 叢集節點為空,或者這是一個Eureka Server 同步請求,直接return
if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
return;
}
// 迴圈相鄰的Eureka Server Node, 分別發起請求同步
for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
// 判斷是否是自身的URL,過濾掉
if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
continue;
}
// 發起同步請求
replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
}
} finally {
tracer.stop();
}
}
步驟說明:
1.判斷叢集節點是否為空,為空則返回
2.isReplication 代表是否是一個複製請求, isReplication = true 表示是其他Eureka Server發過來的同步請求
這個時候是不需要繼續往下同步的。否則會陷入同步死迴圈
3.迴圈叢集節點,過濾掉自身的節點
4.發起同步請求 ,呼叫replicateInstanceActionsToPeers
PS: 這裡提到了PeerEurekaNode , 對於PeerEurekaNodes的叢集節點更新及資料讀取,可以看這個
https://blog.csdn.net/u012394095/article/details/80693681在服務啟動的時候,對PeerEurekaNodes叢集開啟了執行緒更新叢集節點資訊。每15分鐘一次
private void replicateInstanceActionsToPeers(Action action, String appName,
String id, InstanceInfo info, InstanceStatus newStatus,
PeerEurekaNode node) {
try {
InstanceInfo infoFromRegistry = null;
CurrentRequestVersion.set(Version.V2);
switch (action) {
case Cancel: // 下線
node.cancel(appName, id);
break;
case Heartbeat:
// 心跳
InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
// 獲取本地最新的例項資訊
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
break;
case Register: // 註冊
node.register(info);
break;
case StatusUpdate: // 設定覆蓋狀態
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.statusUpdate(appName, id, newStatus, infoFromRegistry);
break;
case DeleteStatusOverride: //刪除覆蓋狀態
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.deleteStatusOverride(appName, id, infoFromRegistry);
break;
}
} catch (Throwable t) {
logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
}
}
private void replicateInstanceActionsToPeers(Action action, String appName,
String id, InstanceInfo info, InstanceStatus newStatus,
PeerEurekaNode node) {
try {
InstanceInfo infoFromRegistry = null;
CurrentRequestVersion.set(Version.V2);
switch (action) {
case Cancel: // 下線
node.cancel(appName, id);
break;
case Heartbeat:
// 心跳
InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
// 獲取本地最新的例項資訊
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
break;
case Register: // 註冊
node.register(info);
break;
case StatusUpdate: // 設定覆蓋狀態
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.statusUpdate(appName, id, newStatus, infoFromRegistry);
break;
case DeleteStatusOverride: //刪除覆蓋狀態
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.deleteStatusOverride(appName, id, infoFromRegistry);
break;
}
} catch (Throwable t) {
logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
}
}
這裡直接看註冊,其他的原理上是一致的。
PeerEurekaNode的register方法如下。
public void register(final InstanceInfo info) throws Exception {
long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);
// 預設採用的是批處理
batchingDispatcher.process(
taskId("register", info),
new InstanceReplicationTask(targetHost, Action.Register, info, null, true) {
public EurekaHttpResponse<Void> execute() {
return replicationClient.register(info);
}
},
expiryTime
);
}
public void register(final InstanceInfo info) throws Exception {
long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);
// 預設採用的是批處理
batchingDispatcher.process(
taskId("register", info),
new InstanceReplicationTask(targetHost, Action.Register, info, null, true) {
public EurekaHttpResponse<Void> execute() {
return replicationClient.register(info);
}
},
expiryTime
);
}
預設採用的是批量任務處理器,就是將task放入任務佇列中,然後通過執行緒獲取任務佇列裡面的任務,模仿ThreadExecutorPool的方式,生成執行緒,
從佇列裡面抓取任務處理,統一批量執行,Eureka Server 那邊也是統一接收,這樣提高了同步效率
批量處理的任務執行器是com.netflix.eureka.cluster.ReplicationTaskProcessor
@Override
public ProcessingResult process(List<ReplicationTask> tasks) {
// 構建ReplicationInstance放入ReplicationList
ReplicationList list = createReplicationListOf(tasks);
try {
// 發起批量處理請求
EurekaHttpResponse<ReplicationListResponse> response = replicationClient.submitBatchUpdates(list);
int statusCode = response.getStatusCode();
if (!isSuccess(statusCode)) {
if (statusCode == 503) {
logger.warn("Server busy (503) HTTP status code received from the peer {}; rescheduling tasks after delay", peerId);
return ProcessingResult.Congestion;
} else {
// Unexpected error returned from the server. This should ideally never happen.
logger.error("Batch update failure with HTTP status code {}; discarding {} replication tasks", statusCode, tasks.size());
return ProcessingResult.PermanentError;
}
} else {
// 處理執行結果 ,成功則呼叫handleSuccess ,失敗則呼叫handleFailure。
handleBatchResponse(tasks, response.getEntity().getResponseList());
}
} catch (Throwable e) {
if (isNetworkConnectException(e)) {
logNetworkErrorSample(null, e);
return ProcessingResult.TransientError;
} else {
logger.error("Not re-trying this exception because it does not seem to be a network exception", e);
return ProcessingResult.PermanentError;
}
}
return ProcessingResult.Success;
}
@Override
public ProcessingResult process(List<ReplicationTask> tasks) {
// 構建ReplicationInstance放入ReplicationList
ReplicationList list = createReplicationListOf(tasks);
try {
// 發起批量處理請求
EurekaHttpResponse<ReplicationListResponse> response = replicationClient.submitBatchUpdates(list);
int statusCode = response.getStatusCode();
if (!isSuccess(statusCode)) {
if (statusCode == 503) {
logger.warn("Server busy (503) HTTP status code received from the peer {}; rescheduling tasks after delay", peerId);
return ProcessingResult.Congestion;
} else {
// Unexpected error returned from the server. This should ideally never happen.
logger.error("Batch update failure with HTTP status code {}; discarding {} replication tasks", statusCode, tasks.size());
return ProcessingResult.PermanentError;
}
} else {
// 處理執行結果 ,成功則呼叫handleSuccess ,失敗則呼叫handleFailure。
handleBatchResponse(tasks, response.getEntity().getResponseList());
}
} catch (Throwable e) {
if (isNetworkConnectException(e)) {
logNetworkErrorSample(null, e);
return ProcessingResult.TransientError;
} else {
logger.error("Not re-trying this exception because it does not seem to be a network exception", e);
return ProcessingResult.PermanentError;
}
}
return ProcessingResult.Success;
}
請求批量處理的介面地址 : peerreplication/batch/
handleBatchResponse(tasks, response.getEntity().getResponseList()) , 迴圈呼叫處理結果,
成功則呼叫handleSuccess. , 失敗則呼叫handleFailure , 比如hearbeat的時候,呼叫返回碼為
404的時候,會重新發起註冊。
ReplicationTask replicationTask = new InstanceReplicationTask(targetHost, Action.Heartbeat, info, overriddenStatus, false) {
@Override
public EurekaHttpResponse<InstanceInfo> execute() throws Throwable {
return replicationClient.sendHeartBeat(appName, id, info, overriddenStatus);
}
@Override
public void handleFailure(int statusCode, Object responseEntity) throws Throwable {
super.handleFailure(statusCode, responseEntity);
if (statusCode == 404) {
// 重新發起註冊。
register(info);
}
} else if (config.shouldSyncWhenTimestampDiffers()) {
InstanceInfo peerInstanceInfo = (InstanceInfo) responseEntity;
if (peerInstanceInfo != null) {
syncInstancesIfTimestampDiffers(appName, id, info, peerInstanceInfo);
}
}
}
};
ReplicationTask replicationTask = new InstanceReplicationTask(targetHost, Action.Heartbeat, info, overriddenStatus, false) {
@Override
public EurekaHttpResponse<InstanceInfo> execute() throws Throwable {
return replicationClient.sendHeartBeat(appName, id, info, overriddenStatus);
}
@Override
public void handleFailure(int statusCode, Object responseEntity) throws Throwable {
super.handleFailure(statusCode, responseEntity);
if (statusCode == 404) {
// 重新發起註冊。
register(info);
}
} else if (config.shouldSyncWhenTimestampDiffers()) {
InstanceInfo peerInstanceInfo = (InstanceInfo) responseEntity;
if (peerInstanceInfo != null) {
syncInstancesIfTimestampDiffers(appName, id, info, peerInstanceInfo);
}
}
}
};
Eureka Server接收同步
程式入口 : com.netflix.eureka.resources.PeerReplicationResource
@Path("batch")
@POST
public Response batchReplication(ReplicationList replicationList) {
try {
ReplicationListResponse batchResponse = new ReplicationListResponse();
// 迴圈請求的任務
for (ReplicationInstance instanceInfo : replicationList.getReplicationList()) {
try {
// 分發任務,同時將處理結果收集起來,等會統一返回
batchResponse.addResponse(dispatch(instanceInfo));
} catch (Exception e) {
batchResponse.addResponse(new ReplicationInstanceResponse(Status.IN