1. 程式人生 > 程式設計 >Nacos一致性協議實現之Distro協議淺析

Nacos一致性協議實現之Distro協議淺析

原文連結:www.liaochuntao.cn/2019/09/16/…

前期導讀

Nacos 中的 DistroConsistencyServiceImpl 工作淺析

之前的文章說的很淺顯,這次打算重頭好好解析下Nacos中使用的alibaba自研的AP協議——Distro

核心程式碼實現

Nacos Naming 模組啟動做的時資料同步

DistroConsistencyServiceImpl

public void load() throws Exception {
  if (SystemUtils.STANDALONE_MODE) {
    initialized = true;
    return
; } // size = 1 means only myself in the list,we need at least one another server alive: // 叢集模式下,需要等待至少兩個節點才可以將邏輯進行 while (serverListManager.getHealthyServers().size() <= 1) { Thread.sleep(1000L); Loggers.DISTRO.info("waiting server list init..."); } // 獲取所有健康的叢集節點 for (Server server : serverListManager.getHealthyServers()) { // 自己則不需要進行資料同步廣播操作
if (NetUtils.localServer().equals(server.getKey())) { continue; } if (Loggers.DISTRO.isDebugEnabled()) { Loggers.DISTRO.debug("sync from " + server); } // 從別的伺服器進行全量資料拉取操作,只需要執行一次即可,剩下的交由增量同步任務去完成 if (syncAllDataFromRemote(server)) { initialized = true; return
; } } } 複製程式碼
全量資料拉取的動作

資料拉取執行者的動作

public boolean syncAllDataFromRemote(Server server) {
  try {
    // 獲取資料
    byte[] data = NamingProxy.getAllData(server.getKey());
    // 接收到的資料進行處理
    processData(data);
    return true;
  } catch (Exception e) {
    Loggers.DISTRO.error("sync full data from " + server + " failed!",e);
    return false;
  }
}
複製程式碼

資料提供者的響應

@RequestMapping(value = "/datums",method = RequestMethod.GET)
public ResponseEntity getAllDatums(HttpServletRequest request,HttpServletResponse response) throws Exception {
  // 直接將儲存的資料容器——Map進行序列化傳輸
  String content = new String(serializer.serialize(dataStore.getDataMap()),StandardCharsets.UTF_8);
  return ResponseEntity.ok(content);
}
複製程式碼

接下來,當從某一個Server Node拉取了全量資料後的操作

public void processData(byte[] data) throws Exception {
        if (data.length > 0) {
            // 先將資料進行反序列化
            Map<String,Datum<Instances>> datumMap =
                serializer.deserializeMap(data,Instances.class);

            // 對資料進行遍歷處理
            for (Map.Entry<String,Datum<Instances>> entry : datumMap.entrySet()) {
                // 資料放入資料儲存容器——DataStore中
                dataStore.put(entry.getKey(),entry.getValue());
                // 判斷監聽器是否包含了對這個Key的監聽,如果沒有,表明是一個新的資料
                if (!listeners.containsKey(entry.getKey())) {
                    // pretty sure the service not exist:
                    if (switchDomain.isDefaultInstanceEphemeral()) {
                        // create empty service
                        Loggers.DISTRO.info("creating service {}",entry.getKey());
                        Service service = new Service();
                        String serviceName = KeyBuilder.getServiceName(entry.getKey());
                        String namespaceId = KeyBuilder.getNamespace(entry.getKey());
                        service.setName(serviceName);
                        service.setNamespaceId(namespaceId);
                        service.setGroupName(Constants.DEFAULT_GROUP);
                        // now validate the service. if failed,exception will be thrown
                        service.setLastModifiedMillis(System.currentTimeMillis());
                        service.recalculateChecksum();
                        // 回撥 Listener 監聽器,告知新的Service資料
                        listeners.get(KeyBuilder.SERVICE_META_KEY_PREFIX).get(0)
                            .onChange(KeyBuilder.buildServiceMetaKey(namespaceId,serviceName),service);
                    }
                }
            }
            // 進行 Listener 的監聽回撥
            for (Map.Entry<String,Datum<Instances>> entry : datumMap.entrySet()) {
                if (!listeners.containsKey(entry.getKey())) {
                    // Should not happen:
                    Loggers.DISTRO.warn("listener of {} not found.",entry.getKey());
                    continue;
                }

                try {
                    for (RecordListener listener : listeners.get(entry.getKey())) {
                        listener.onChange(entry.getKey(),entry.getValue().value);
                    }
                } catch (Exception e) {
                    Loggers.DISTRO.error("[NACOS-DISTRO] error while execute listener of key: {}",entry.getKey(),e);
                    continue;
                }

                // Update data store if listener executed successfully:
                dataStore.put(entry.getKey(),entry.getValue());
            }
        }
    }
複製程式碼

到這裡,Nacos Naming模組的Distro協議的初次啟動時的資料全量同步到這裡就告一段落了,接下來就是資料的增量同步了,首先要介紹一個Distro協議的一個概念——權威Server

權威Server的判斷器

public class DistroMapper implements ServerChangeListener {

    private List<String> healthyList = new ArrayList<>();

    public List<String> getHealthyList() {
        return healthyList;
    }

    @Autowired
    private SwitchDomain switchDomain;

    @Autowired
    private ServerListManager serverListManager;

    /**
     * init server list
     */
    @PostConstruct
    public void init() {
        serverListManager.listen(this);
    }

    // 判斷該資料是否可以由本節點進行響應
    public boolean responsible(Cluster cluster,Instance instance) {
        return switchDomain.isHealthCheckEnabled(cluster.getServiceName())
            && !cluster.getHealthCheckTask().isCancelled()
            && responsible(cluster.getServiceName())
            && cluster.contains(instance);
    }

    // 根據 ServiceName 進行 Hash 計算,找到對應的權威節點的索引,判斷是否是本節點,是的話表明該資料可以由本節點進行處理
    public boolean responsible(String serviceName) {
        if (!switchDomain.isDistroEnabled() || SystemUtils.STANDALONE_MODE) {
            return true;
        }

        if (CollectionUtils.isEmpty(healthyList)) {
            // means distro config is not ready yet
            return false;
        }

        int index = healthyList.indexOf(NetUtils.localServer());
        int lastIndex = healthyList.lastIndexOf(NetUtils.localServer());
        if (lastIndex < 0 || index < 0) {
            return true;
        }

        int target = distroHash(serviceName) % healthyList.size();
        return target >= index && target <= lastIndex;
    }

    // 根據 ServiceName 找到權威 Server 的地址
    public String mapSrv(String serviceName) {
        if (CollectionUtils.isEmpty(healthyList) || !switchDomain.isDistroEnabled()) {
            return NetUtils.localServer();
        }

        try {
            return healthyList.get(distroHash(serviceName) % healthyList.size());
        } catch (Exception e) {
            Loggers.SRV_LOG.warn("distro mapper failed,return localhost: " + NetUtils.localServer(),e);

            return NetUtils.localServer();
        }
    }

    public int distroHash(String serviceName) {
        return Math.abs(serviceName.hashCode() % Integer.MAX_VALUE);
    }

    @Override
    public void onChangeServerList(List<Server> latestMembers) {

    }

    @Override
    public void onChangeHealthyServerList(List<Server> latestReachableMembers) {

        List<String> newHealthyList = new ArrayList<>();
        for (Server server : latestReachableMembers) {
            newHealthyList.add(server.getKey());
        }
        healthyList = newHealthyList;
    }
}
複製程式碼

上面的元件,就是Distro協議的一個重要部分,根據資料進行 Hash 計算查詢叢集節點列表中的權威節點

節點間的資料增量同步
public class TaskDispatcher {

    @Autowired
    private GlobalConfig partitionConfig;

    @Autowired
    private DataSyncer dataSyncer;

    private List<TaskScheduler> taskSchedulerList = new ArrayList<>();

    private final int cpuCoreCount = Runtime.getRuntime().availableProcessors();

    @PostConstruct
    public void init() {
        // 構建任務執行器
        for (int i = 0; i < cpuCoreCount; i++) {
            TaskScheduler taskScheduler = new TaskScheduler(i);
            taskSchedulerList.add(taskScheduler);
            // 任務排程執行器提交
            GlobalExecutor.submitTaskDispatch(taskScheduler);
        }
    }

    public void addTask(String key) {
        // 根據 Key 進行 Hash 找到一個 TaskScheduler 進行任務提交
        taskSchedulerList.get(UtilsAndCommons.shakeUp(key,cpuCoreCount)).addTask(key);
    }

    public class TaskScheduler implements Runnable {

        private int index;

        private int dataSize = 0;

        private long lastDispatchTime = 0L;

        private BlockingQueue<String> queue = new LinkedBlockingQueue<>(128 * 1024);

        public TaskScheduler(int index) {
            this.index = index;
        }

        public void addTask(String key) {
            queue.offer(key);
        }

        public int getIndex() {
            return index;
        }

        @Override
        public void run() {
            List<String> keys = new ArrayList<>();
            while (true) {
                try {
                    // 從任務快取佇列中獲取一個任務(存在超時設定)
                    String key = queue.poll(partitionConfig.getTaskDispatchPeriod(),TimeUnit.MILLISECONDS);
                    if (Loggers.DISTRO.isDebugEnabled() && StringUtils.isNotBlank(key)) {
                        Loggers.DISTRO.debug("got key: {}",key);
                    }
                    // 如果不存在叢集或者叢集節點為空
                    if (dataSyncer.getServers() == null || dataSyncer.getServers().isEmpty()) {
                        continue;
                    }
                    if (StringUtils.isBlank(key)) {
                        continue;
                    }
                    if (dataSize == 0) {
                        keys = new ArrayList<>();
                    }
                    // 進行批量任務處理,這裡做一次暫存操作,為了避免
                    keys.add(key);
                    dataSize++;
                    // 如果此時的任務暫存數量達到了指定的批量,或者任務的時間達到了最大設定,進行資料同步任務
                    if (dataSize == partitionConfig.getBatchSyncKeyCount() ||
                        (System.currentTimeMillis() - lastDispatchTime) > partitionConfig.getTaskDispatchPeriod()) {
                        // 為每一個server建立一個SyncTask任務
                        for (Server member : dataSyncer.getServers()) {
                            if (NetUtils.localServer().equals(member.getKey())) {
                                continue;
                            }
                            SyncTask syncTask = new SyncTask();
                            syncTask.setKeys(keys);
                            syncTask.setTargetServer(member.getKey());
                            if (Loggers.DISTRO.isDebugEnabled() && StringUtils.isNotBlank(key)) {
                                Loggers.DISTRO.debug("add sync task: {}",JSON.toJSONString(syncTask));
                            }
                            // 進行任務提交,同時設定任務延遲執行時間,這裡設定為立即執行
                            dataSyncer.submit(syncTask,0);
                        }
                        lastDispatchTime = System.currentTimeMillis();
                        dataSize = 0;
                    }
                } catch (Exception e) {
                    Loggers.DISTRO.error("dispatch sync task failed.",e);
                }
            }
        }
    }
}
複製程式碼
DataSyncer 資料同步任務的真正執行者
public class DataSyncer {

    ...

    @PostConstruct
    public void init() {
        // 執行定期的資料同步任務(每五秒執行一次)
        startTimedSync();
    }

    // 任務提交
    public void submit(SyncTask task,long delay) {
        // If it's a new task:
        if (task.getRetryCount() == 0) {
            // 遍歷所有的任務 Key
            Iterator<String> iterator = task.getKeys().iterator();
            while (iterator.hasNext()) {
                String key = iterator.next();
                // 資料任務放入 Map 中,避免資料同步任務重複提交
                if (StringUtils.isNotBlank(taskMap.putIfAbsent(buildKey(key,task.getTargetServer()),key))) {
                    // associated key already exist:
                    if (Loggers.DISTRO.isDebugEnabled()) {
                        Loggers.DISTRO.debug("sync already in process,key: {}",key);
                    }
                    // 如果任務已經存在,則移除該任務的 Key
                    iterator.remove();
                }
            }
        }
        // 如果所有的任務都已經移除了,結束本次任務提交
        if (task.getKeys().isEmpty()) {
            // all keys are removed:
            return;
        }
        // 非同步任務執行資料同步
        GlobalExecutor.submitDataSync(() -> {
            // 1. check the server
            if (getServers() == null || getServers().isEmpty()) {
                Loggers.SRV_LOG.warn("try to sync data but server list is empty.");
                return;
            }
            // 獲取資料同步任務的實際同步資料
            List<String> keys = task.getKeys();
            if (Loggers.SRV_LOG.isDebugEnabled()) {
                Loggers.SRV_LOG.debug("try to sync data for this keys {}.",keys);
            }
            // 2. get the datums by keys and check the datum is empty or not
            // 通過key進行批量資料獲取
            Map<String,Datum> datumMap = dataStore.batchGet(keys);
            // 如果資料已經被移除了,取消本次任務
            if (datumMap == null || datumMap.isEmpty()) {
                // clear all flags of this task:
                for (String key : keys) {
                    taskMap.remove(buildKey(key,task.getTargetServer()));
                }
                return;
            }
            // 資料序列化
            byte[] data = serializer.serialize(datumMap);
            long timestamp = System.currentTimeMillis();
            // 進行增量資料同步提交給其他節點
            boolean success = NamingProxy.syncData(data,task.getTargetServer());
            // 如果本次資料同步任務失敗,則重新建立SyncTask,設定重試的次數資訊
            if (!success) {
                SyncTask syncTask = new SyncTask();
                syncTask.setKeys(task.getKeys());
                syncTask.setRetryCount(task.getRetryCount() + 1);
                syncTask.setLastExecuteTime(timestamp);
                syncTask.setTargetServer(task.getTargetServer());
                retrySync(syncTask);
            } else {
                // clear all flags of this task:
                for (String key : task.getKeys()) {
                    taskMap.remove(buildKey(key,task.getTargetServer()));
                }
            }
        },delay);
    }

    // 任務重試
    public void retrySync(SyncTask syncTask) {
        Server server = new Server();
        server.setIp(syncTask.getTargetServer().split(":")[0]);
        server.setServePort(Integer.parseInt(syncTask.getTargetServer().split(":")[1]));
        if (!getServers().contains(server)) {
            // if server is no longer in healthy server list,ignore this task:
            return;
        }
        // TODO may choose other retry policy.
        // 自動延遲重試任務的下次執行時間
        submit(syncTask,partitionConfig.getSyncRetryDelay());
    }

    public void startTimedSync() {
        GlobalExecutor.schedulePartitionDataTimedSync(new TimedSync());
    }

    // 執行週期任務
    // 每次將自己負責的資料進行廣播到其他的 Server 節點
    public class TimedSync implements Runnable {

        @Override
        public void run() {
            try {
                if (Loggers.DISTRO.isDebugEnabled()) {
                    Loggers.DISTRO.debug("server list is: {}",getServers());
                }
                // send local timestamps to other servers:
                Map<String,String> keyChecksums = new HashMap<>(64);
                // 對資料儲存容器的
                for (String key : dataStore.keys()) {
                    // 如果自己不是負責此資料的權威 Server,則無權對此資料做叢集間的廣播通知操作
                    if (!distroMapper.responsible(KeyBuilder.getServiceName(key))) {
                        continue;
                    }
                    // 獲取資料操作,
                    Datum datum = dataStore.get(key);
                    if (datum == null) {
                        continue;
                    }
                    // 放入資料廣播列表
                    keyChecksums.put(key,datum.value.getChecksum());
                }
                if (keyChecksums.isEmpty()) {
                    return;
                }
                if (Loggers.DISTRO.isDebugEnabled()) {
                    Loggers.DISTRO.debug("sync checksums: {}",keyChecksums);
                }
                // 對叢集的所有節點(除了自己),做資料廣播操作
                for (Server member : getServers()) {
                    if (NetUtils.localServer().equals(member.getKey())) {
                        continue;
                    }
                    // 叢集間的資料廣播操作
                    NamingProxy.syncCheckSums(keyChecksums,member.getKey());
                }
            } catch (Exception e) {
                Loggers.DISTRO.error("timed sync task failed.",e);
            }
        }
    }

    public List<Server> getServers() {
        return serverListManager.getHealthyServers();
    }

    public String buildKey(String key,String targetServer) {
        return key + UtilsAndCommons.CACHE_KEY_SPLITER + targetServer;
    }
}
複製程式碼

那麼其他節點在接受到資料後的操作是什麼

@RequestMapping(value = "/checksum",method = RequestMethod.PUT)
public ResponseEntity syncChecksum(HttpServletRequest request,HttpServletResponse response) throws Exception {
    // 由那個節點傳輸而來的資料
    String source = WebUtils.required(request,"source");
    String entity = IOUtils.toString(request.getInputStream(),"UTF-8");
    // 資料序列化
    Map<String,String> dataMap = serializer.deserialize(entity.getBytes(),new TypeReference<Map<String,String>>() {});
    // 資料接收操作
    consistencyService.onReceiveChecksums(dataMap,source);
    return ResponseEntity.ok("ok");
}
複製程式碼
public void onReceiveChecksums(Map<String,String> checksumMap,String server) {
    if (syncChecksumTasks.containsKey(server)) {
        // Already in process of this server:
        Loggers.DISTRO.warn("sync checksum task already in process with {}",server);
        return;
    }
    // 標記當前 Server 傳來的資料正在處理
    syncChecksumTasks.put(server,"1");
    try {
        // 需要更新的 key
        List<String> toUpdateKeys = new ArrayList<>();
        // 需要刪除的 Key
        List<String> toRemoveKeys = new ArrayList<>();
        // 對傳來的資料進行遍歷操作
        for (Map.Entry<String,String> entry : checksumMap.entrySet()) {
            // 如果傳來的資料存在由本節點負責的資料,則直接退出本次資料同步操作(違反了權威server的設定要求)
            if (distroMapper.responsible(KeyBuilder.getServiceName(entry.getKey()))) {
                // this key should not be sent from remote server:
                Loggers.DISTRO.error("receive responsible key timestamp of " + entry.getKey() + " from " + server);
                // abort the procedure:
                return;
            }
            // 如果當前資料儲存容器不存在這個資料,或者校驗值不一樣,則進行資料更新操作
            if (!dataStore.contains(entry.getKey()) || 
            dataStore.get(entry.getKey()).value == null || 
            !dataStore.get(entry.getKey()).value.getChecksum().equals(entry.getValue())) {
                toUpdateKeys.add(entry.getKey());
            }
        }
        // 直接遍歷資料儲存容器的所有資料
        for (String key : dataStore.keys()) {
            // 如果資料不是 source server 負責的,則跳過
            if (!server.equals(distroMapper.mapSrv(KeyBuilder.getServiceName(key)))) {
                continue;
            }
            // 如果同步的資料不包含這個key,表明這個key是需要被刪除的
            if (!checksumMap.containsKey(key)) {
                toRemoveKeys.add(key);
            }
        }
        if (Loggers.DISTRO.isDebugEnabled()) {
            Loggers.DISTRO.info("to remove keys: {},to update keys: {},source: {}",toRemoveKeys,toUpdateKeys,server);
        }
        // 執行資料閃出去操作
        for (String key : toRemoveKeys) {
            onRemove(key);
        }
        if (toUpdateKeys.isEmpty()) {
            return;
        }
        try {
            // 根據需要更新的key進行資料拉取,然後對同步的資料進行操作,剩下的如同最開始的全量資料同步所做的操作
            byte[] result = NamingProxy.getData(toUpdateKeys,server);
            processData(result);
        } catch (Exception e) {
            Loggers.DISTRO.error("get data from " + server + " failed!",e);
        }
    finally {
        // Remove this 'in process' flag:
        // 移除本次 source server 的資料同步任務標識
        syncChecksumTasks.remove(server);
    }
}
複製程式碼