MQTT---HiveMQ原始碼詳解(二十)Cluster-Replicate/VectorClock
MQTT交流群:221405150
17章中我們講解了整個HiveMQ的Cluster的原理以及實現方式,值得一提的當然是資料的Replicate,以及當Replicate資料與本地資料存在衝突時,HiveMQ是如何實現的。
Replicate
在每一條被 running node持久化的資料都會使用Primary環 Replicate。
當node從JOINING狀態變更為RUNNING狀態前都會使用Primary環 Replicate。
當node變更為MERGE_MINORITY成功後都會使用Primary環 Replicate。
當node變更為MERGE_MINORITY成功後都會使用Minority環 Replicate。
VectorClock
幾乎所有做Cluster不可避免的就是需要解決衝突,各種解法比較多,其中VectorClock做法比較流行,下面我們看看HiveMQ如何實現即可,具體使用VectorClock的原因、以及原理我們就不過多描述了。
注意:VectorClocks上的成員變數為Map<String, VectorClock>,
貌似是idea uml外掛顯示問題。
VectorClock持有vectors的一個node與vector對應關係,提供遞增、合併、比較這幾種功能/服務,以記錄一個key在一個node上的Vector。
VectorClocks持有每一個key的VectorClock,為每一個key提供新增、刪除、get等方法/服務, 通過VectorClocks就可以獲得到每個key的完整的VectorClock。
在node獲得到Replicate要求時,當本地資料與備份資料存在衝突時,就會使用VectorClock來進行解決衝突。
在每個ClusterPersistence中都會持有一個VectorClocks用以解決衝突。
示例
下面我們就列舉一段ClientSessionClusterPersistenceImpl處理Replica請求時,當存在衝突解決衝突的程式碼。
public ListenableFuture<Void> handleReplica(@NotNull String clientId, @NotNull ClientSession clientSession, long requestTimestamp, VectorClock requestVectorClock) {
Preconditions.checkNotNull(clientId, "Client id must not be null");
Preconditions.checkNotNull(clientSession, "Client session must not be null");
return getExecutor(clientId).add(() -> {
VectorClock localVectorClock = vectorClocks.get(clientId);
//當請求的向量時鐘在比本地向量時鐘之前或者相當,則忽略本次備份
if (requestVectorClock.before(localVectorClock) ||
requestVectorClock.equals(localVectorClock)) {
return null;
}
//當本地向量時鐘在請求向量時鐘之前,則直接儲存即可
if (localVectorClock.before(requestVectorClock)) {
vectorClocks.put(clientId, requestVectorClock);
clientSessionLocalPersistence.persistent(clientId, clientSession, requestTimestamp);
} else {
//當兩個向量時鐘一致,則合併解決衝突
localVectorClock.merge(requestVectorClock);
localVectorClock.increment(clusterConnection.getClusterId());
vectorClocks.put(clientId, localVectorClock);
ClientSession localClientSession = clientSessionLocalPersistence.get(clientId);
if (!localClientSession.isConnected() && clientSession.isConnected()) {
clientSessionLocalPersistence.persistent(clientId, clientSession, requestTimestamp);
}
}
return null;
});
}