Nacos一致性協議實現之Distro協議淺析
阿新 • • 發佈:2019-12-31
原文連結:www.liaochuntao.cn/2019/09/16/…
前期導讀
Nacos 中的 DistroConsistencyServiceImpl 工作淺析
之前的文章說的很淺顯,這次打算重頭好好解析下Nacos
中使用的alibaba
自研的AP
協議——Distro
核心程式碼實現
Nacos Naming 模組啟動做的時資料同步
DistroConsistencyServiceImpl
public void load() throws Exception {
if (SystemUtils.STANDALONE_MODE) {
initialized = true;
return ;
}
// size = 1 means only myself in the list,we need at least one another server alive:
// 叢集模式下,需要等待至少兩個節點才可以將邏輯進行
while (serverListManager.getHealthyServers().size() <= 1) {
Thread.sleep(1000L);
Loggers.DISTRO.info("waiting server list init...");
}
// 獲取所有健康的叢集節點
for (Server server : serverListManager.getHealthyServers()) {
// 自己則不需要進行資料同步廣播操作
if (NetUtils.localServer().equals(server.getKey())) {
continue;
}
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO.debug("sync from " + server);
}
// 從別的伺服器進行全量資料拉取操作,只需要執行一次即可,剩下的交由增量同步任務去完成
if (syncAllDataFromRemote(server)) {
initialized = true;
return ;
}
}
}
複製程式碼
全量資料拉取的動作
資料拉取執行者的動作
public boolean syncAllDataFromRemote(Server server) {
try {
// 獲取資料
byte[] data = NamingProxy.getAllData(server.getKey());
// 接收到的資料進行處理
processData(data);
return true;
} catch (Exception e) {
Loggers.DISTRO.error("sync full data from " + server + " failed!",e);
return false;
}
}
複製程式碼
資料提供者的響應
@RequestMapping(value = "/datums",method = RequestMethod.GET)
public ResponseEntity getAllDatums(HttpServletRequest request,HttpServletResponse response) throws Exception {
// 直接將儲存的資料容器——Map進行序列化傳輸
String content = new String(serializer.serialize(dataStore.getDataMap()),StandardCharsets.UTF_8);
return ResponseEntity.ok(content);
}
複製程式碼
接下來,當從某一個Server Node
拉取了全量資料後的操作
public void processData(byte[] data) throws Exception {
if (data.length > 0) {
// 先將資料進行反序列化
Map<String,Datum<Instances>> datumMap =
serializer.deserializeMap(data,Instances.class);
// 對資料進行遍歷處理
for (Map.Entry<String,Datum<Instances>> entry : datumMap.entrySet()) {
// 資料放入資料儲存容器——DataStore中
dataStore.put(entry.getKey(),entry.getValue());
// 判斷監聽器是否包含了對這個Key的監聽,如果沒有,表明是一個新的資料
if (!listeners.containsKey(entry.getKey())) {
// pretty sure the service not exist:
if (switchDomain.isDefaultInstanceEphemeral()) {
// create empty service
Loggers.DISTRO.info("creating service {}",entry.getKey());
Service service = new Service();
String serviceName = KeyBuilder.getServiceName(entry.getKey());
String namespaceId = KeyBuilder.getNamespace(entry.getKey());
service.setName(serviceName);
service.setNamespaceId(namespaceId);
service.setGroupName(Constants.DEFAULT_GROUP);
// now validate the service. if failed,exception will be thrown
service.setLastModifiedMillis(System.currentTimeMillis());
service.recalculateChecksum();
// 回撥 Listener 監聽器,告知新的Service資料
listeners.get(KeyBuilder.SERVICE_META_KEY_PREFIX).get(0)
.onChange(KeyBuilder.buildServiceMetaKey(namespaceId,serviceName),service);
}
}
}
// 進行 Listener 的監聽回撥
for (Map.Entry<String,Datum<Instances>> entry : datumMap.entrySet()) {
if (!listeners.containsKey(entry.getKey())) {
// Should not happen:
Loggers.DISTRO.warn("listener of {} not found.",entry.getKey());
continue;
}
try {
for (RecordListener listener : listeners.get(entry.getKey())) {
listener.onChange(entry.getKey(),entry.getValue().value);
}
} catch (Exception e) {
Loggers.DISTRO.error("[NACOS-DISTRO] error while execute listener of key: {}",entry.getKey(),e);
continue;
}
// Update data store if listener executed successfully:
dataStore.put(entry.getKey(),entry.getValue());
}
}
}
複製程式碼
到這裡,Nacos Naming
模組的Distro
協議的初次啟動時的資料全量同步到這裡就告一段落了,接下來就是資料的增量同步了,首先要介紹一個Distro
協議的一個概念——權威Server
權威
Server
的判斷器
public class DistroMapper implements ServerChangeListener {
private List<String> healthyList = new ArrayList<>();
public List<String> getHealthyList() {
return healthyList;
}
@Autowired
private SwitchDomain switchDomain;
@Autowired
private ServerListManager serverListManager;
/**
* init server list
*/
@PostConstruct
public void init() {
serverListManager.listen(this);
}
// 判斷該資料是否可以由本節點進行響應
public boolean responsible(Cluster cluster,Instance instance) {
return switchDomain.isHealthCheckEnabled(cluster.getServiceName())
&& !cluster.getHealthCheckTask().isCancelled()
&& responsible(cluster.getServiceName())
&& cluster.contains(instance);
}
// 根據 ServiceName 進行 Hash 計算,找到對應的權威節點的索引,判斷是否是本節點,是的話表明該資料可以由本節點進行處理
public boolean responsible(String serviceName) {
if (!switchDomain.isDistroEnabled() || SystemUtils.STANDALONE_MODE) {
return true;
}
if (CollectionUtils.isEmpty(healthyList)) {
// means distro config is not ready yet
return false;
}
int index = healthyList.indexOf(NetUtils.localServer());
int lastIndex = healthyList.lastIndexOf(NetUtils.localServer());
if (lastIndex < 0 || index < 0) {
return true;
}
int target = distroHash(serviceName) % healthyList.size();
return target >= index && target <= lastIndex;
}
// 根據 ServiceName 找到權威 Server 的地址
public String mapSrv(String serviceName) {
if (CollectionUtils.isEmpty(healthyList) || !switchDomain.isDistroEnabled()) {
return NetUtils.localServer();
}
try {
return healthyList.get(distroHash(serviceName) % healthyList.size());
} catch (Exception e) {
Loggers.SRV_LOG.warn("distro mapper failed,return localhost: " + NetUtils.localServer(),e);
return NetUtils.localServer();
}
}
public int distroHash(String serviceName) {
return Math.abs(serviceName.hashCode() % Integer.MAX_VALUE);
}
@Override
public void onChangeServerList(List<Server> latestMembers) {
}
@Override
public void onChangeHealthyServerList(List<Server> latestReachableMembers) {
List<String> newHealthyList = new ArrayList<>();
for (Server server : latestReachableMembers) {
newHealthyList.add(server.getKey());
}
healthyList = newHealthyList;
}
}
複製程式碼
上面的元件,就是Distro
協議的一個重要部分,根據資料進行 Hash 計算查詢叢集節點列表中的權威節點
節點間的資料增量同步
public class TaskDispatcher {
@Autowired
private GlobalConfig partitionConfig;
@Autowired
private DataSyncer dataSyncer;
private List<TaskScheduler> taskSchedulerList = new ArrayList<>();
private final int cpuCoreCount = Runtime.getRuntime().availableProcessors();
@PostConstruct
public void init() {
// 構建任務執行器
for (int i = 0; i < cpuCoreCount; i++) {
TaskScheduler taskScheduler = new TaskScheduler(i);
taskSchedulerList.add(taskScheduler);
// 任務排程執行器提交
GlobalExecutor.submitTaskDispatch(taskScheduler);
}
}
public void addTask(String key) {
// 根據 Key 進行 Hash 找到一個 TaskScheduler 進行任務提交
taskSchedulerList.get(UtilsAndCommons.shakeUp(key,cpuCoreCount)).addTask(key);
}
public class TaskScheduler implements Runnable {
private int index;
private int dataSize = 0;
private long lastDispatchTime = 0L;
private BlockingQueue<String> queue = new LinkedBlockingQueue<>(128 * 1024);
public TaskScheduler(int index) {
this.index = index;
}
public void addTask(String key) {
queue.offer(key);
}
public int getIndex() {
return index;
}
@Override
public void run() {
List<String> keys = new ArrayList<>();
while (true) {
try {
// 從任務快取佇列中獲取一個任務(存在超時設定)
String key = queue.poll(partitionConfig.getTaskDispatchPeriod(),TimeUnit.MILLISECONDS);
if (Loggers.DISTRO.isDebugEnabled() && StringUtils.isNotBlank(key)) {
Loggers.DISTRO.debug("got key: {}",key);
}
// 如果不存在叢集或者叢集節點為空
if (dataSyncer.getServers() == null || dataSyncer.getServers().isEmpty()) {
continue;
}
if (StringUtils.isBlank(key)) {
continue;
}
if (dataSize == 0) {
keys = new ArrayList<>();
}
// 進行批量任務處理,這裡做一次暫存操作,為了避免
keys.add(key);
dataSize++;
// 如果此時的任務暫存數量達到了指定的批量,或者任務的時間達到了最大設定,進行資料同步任務
if (dataSize == partitionConfig.getBatchSyncKeyCount() ||
(System.currentTimeMillis() - lastDispatchTime) > partitionConfig.getTaskDispatchPeriod()) {
// 為每一個server建立一個SyncTask任務
for (Server member : dataSyncer.getServers()) {
if (NetUtils.localServer().equals(member.getKey())) {
continue;
}
SyncTask syncTask = new SyncTask();
syncTask.setKeys(keys);
syncTask.setTargetServer(member.getKey());
if (Loggers.DISTRO.isDebugEnabled() && StringUtils.isNotBlank(key)) {
Loggers.DISTRO.debug("add sync task: {}",JSON.toJSONString(syncTask));
}
// 進行任務提交,同時設定任務延遲執行時間,這裡設定為立即執行
dataSyncer.submit(syncTask,0);
}
lastDispatchTime = System.currentTimeMillis();
dataSize = 0;
}
} catch (Exception e) {
Loggers.DISTRO.error("dispatch sync task failed.",e);
}
}
}
}
}
複製程式碼
DataSyncer 資料同步任務的真正執行者
public class DataSyncer {
...
@PostConstruct
public void init() {
// 執行定期的資料同步任務(每五秒執行一次)
startTimedSync();
}
// 任務提交
public void submit(SyncTask task,long delay) {
// If it's a new task:
if (task.getRetryCount() == 0) {
// 遍歷所有的任務 Key
Iterator<String> iterator = task.getKeys().iterator();
while (iterator.hasNext()) {
String key = iterator.next();
// 資料任務放入 Map 中,避免資料同步任務重複提交
if (StringUtils.isNotBlank(taskMap.putIfAbsent(buildKey(key,task.getTargetServer()),key))) {
// associated key already exist:
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO.debug("sync already in process,key: {}",key);
}
// 如果任務已經存在,則移除該任務的 Key
iterator.remove();
}
}
}
// 如果所有的任務都已經移除了,結束本次任務提交
if (task.getKeys().isEmpty()) {
// all keys are removed:
return;
}
// 非同步任務執行資料同步
GlobalExecutor.submitDataSync(() -> {
// 1. check the server
if (getServers() == null || getServers().isEmpty()) {
Loggers.SRV_LOG.warn("try to sync data but server list is empty.");
return;
}
// 獲取資料同步任務的實際同步資料
List<String> keys = task.getKeys();
if (Loggers.SRV_LOG.isDebugEnabled()) {
Loggers.SRV_LOG.debug("try to sync data for this keys {}.",keys);
}
// 2. get the datums by keys and check the datum is empty or not
// 通過key進行批量資料獲取
Map<String,Datum> datumMap = dataStore.batchGet(keys);
// 如果資料已經被移除了,取消本次任務
if (datumMap == null || datumMap.isEmpty()) {
// clear all flags of this task:
for (String key : keys) {
taskMap.remove(buildKey(key,task.getTargetServer()));
}
return;
}
// 資料序列化
byte[] data = serializer.serialize(datumMap);
long timestamp = System.currentTimeMillis();
// 進行增量資料同步提交給其他節點
boolean success = NamingProxy.syncData(data,task.getTargetServer());
// 如果本次資料同步任務失敗,則重新建立SyncTask,設定重試的次數資訊
if (!success) {
SyncTask syncTask = new SyncTask();
syncTask.setKeys(task.getKeys());
syncTask.setRetryCount(task.getRetryCount() + 1);
syncTask.setLastExecuteTime(timestamp);
syncTask.setTargetServer(task.getTargetServer());
retrySync(syncTask);
} else {
// clear all flags of this task:
for (String key : task.getKeys()) {
taskMap.remove(buildKey(key,task.getTargetServer()));
}
}
},delay);
}
// 任務重試
public void retrySync(SyncTask syncTask) {
Server server = new Server();
server.setIp(syncTask.getTargetServer().split(":")[0]);
server.setServePort(Integer.parseInt(syncTask.getTargetServer().split(":")[1]));
if (!getServers().contains(server)) {
// if server is no longer in healthy server list,ignore this task:
return;
}
// TODO may choose other retry policy.
// 自動延遲重試任務的下次執行時間
submit(syncTask,partitionConfig.getSyncRetryDelay());
}
public void startTimedSync() {
GlobalExecutor.schedulePartitionDataTimedSync(new TimedSync());
}
// 執行週期任務
// 每次將自己負責的資料進行廣播到其他的 Server 節點
public class TimedSync implements Runnable {
@Override
public void run() {
try {
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO.debug("server list is: {}",getServers());
}
// send local timestamps to other servers:
Map<String,String> keyChecksums = new HashMap<>(64);
// 對資料儲存容器的
for (String key : dataStore.keys()) {
// 如果自己不是負責此資料的權威 Server,則無權對此資料做叢集間的廣播通知操作
if (!distroMapper.responsible(KeyBuilder.getServiceName(key))) {
continue;
}
// 獲取資料操作,
Datum datum = dataStore.get(key);
if (datum == null) {
continue;
}
// 放入資料廣播列表
keyChecksums.put(key,datum.value.getChecksum());
}
if (keyChecksums.isEmpty()) {
return;
}
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO.debug("sync checksums: {}",keyChecksums);
}
// 對叢集的所有節點(除了自己),做資料廣播操作
for (Server member : getServers()) {
if (NetUtils.localServer().equals(member.getKey())) {
continue;
}
// 叢集間的資料廣播操作
NamingProxy.syncCheckSums(keyChecksums,member.getKey());
}
} catch (Exception e) {
Loggers.DISTRO.error("timed sync task failed.",e);
}
}
}
public List<Server> getServers() {
return serverListManager.getHealthyServers();
}
public String buildKey(String key,String targetServer) {
return key + UtilsAndCommons.CACHE_KEY_SPLITER + targetServer;
}
}
複製程式碼
那麼其他節點在接受到資料後的操作是什麼
@RequestMapping(value = "/checksum",method = RequestMethod.PUT)
public ResponseEntity syncChecksum(HttpServletRequest request,HttpServletResponse response) throws Exception {
// 由那個節點傳輸而來的資料
String source = WebUtils.required(request,"source");
String entity = IOUtils.toString(request.getInputStream(),"UTF-8");
// 資料序列化
Map<String,String> dataMap = serializer.deserialize(entity.getBytes(),new TypeReference<Map<String,String>>() {});
// 資料接收操作
consistencyService.onReceiveChecksums(dataMap,source);
return ResponseEntity.ok("ok");
}
複製程式碼
public void onReceiveChecksums(Map<String,String> checksumMap,String server) {
if (syncChecksumTasks.containsKey(server)) {
// Already in process of this server:
Loggers.DISTRO.warn("sync checksum task already in process with {}",server);
return;
}
// 標記當前 Server 傳來的資料正在處理
syncChecksumTasks.put(server,"1");
try {
// 需要更新的 key
List<String> toUpdateKeys = new ArrayList<>();
// 需要刪除的 Key
List<String> toRemoveKeys = new ArrayList<>();
// 對傳來的資料進行遍歷操作
for (Map.Entry<String,String> entry : checksumMap.entrySet()) {
// 如果傳來的資料存在由本節點負責的資料,則直接退出本次資料同步操作(違反了權威server的設定要求)
if (distroMapper.responsible(KeyBuilder.getServiceName(entry.getKey()))) {
// this key should not be sent from remote server:
Loggers.DISTRO.error("receive responsible key timestamp of " + entry.getKey() + " from " + server);
// abort the procedure:
return;
}
// 如果當前資料儲存容器不存在這個資料,或者校驗值不一樣,則進行資料更新操作
if (!dataStore.contains(entry.getKey()) ||
dataStore.get(entry.getKey()).value == null ||
!dataStore.get(entry.getKey()).value.getChecksum().equals(entry.getValue())) {
toUpdateKeys.add(entry.getKey());
}
}
// 直接遍歷資料儲存容器的所有資料
for (String key : dataStore.keys()) {
// 如果資料不是 source server 負責的,則跳過
if (!server.equals(distroMapper.mapSrv(KeyBuilder.getServiceName(key)))) {
continue;
}
// 如果同步的資料不包含這個key,表明這個key是需要被刪除的
if (!checksumMap.containsKey(key)) {
toRemoveKeys.add(key);
}
}
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO.info("to remove keys: {},to update keys: {},source: {}",toRemoveKeys,toUpdateKeys,server);
}
// 執行資料閃出去操作
for (String key : toRemoveKeys) {
onRemove(key);
}
if (toUpdateKeys.isEmpty()) {
return;
}
try {
// 根據需要更新的key進行資料拉取,然後對同步的資料進行操作,剩下的如同最開始的全量資料同步所做的操作
byte[] result = NamingProxy.getData(toUpdateKeys,server);
processData(result);
} catch (Exception e) {
Loggers.DISTRO.error("get data from " + server + " failed!",e);
}
finally {
// Remove this 'in process' flag:
// 移除本次 source server 的資料同步任務標識
syncChecksumTasks.remove(server);
}
}
複製程式碼