1. 程式人生 > 程式設計 >聊聊nacos ServiceManager的updateInstance

聊聊nacos ServiceManager的updateInstance

本文主要研究一下nacos ServiceManager的updateInstance

ServiceManager

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/core/ServiceManager.java

@Component
@DependsOn("nacosApplicationContext")
public class ServiceManager implements RecordListener<Service> {

    /**
     * Map<namespace,Map<group::serviceName,Service>>
     */
    private Map<String,Map<String,Service>> serviceMap = new ConcurrentHashMap<>();

    private LinkedBlockingDeque<ServiceKey> toBeUpdatedServicesQueue = new LinkedBlockingDeque<>(1024 * 1024);

    private Synchronizer synchronizer = new ServiceStatusSynchronizer();

    private final Lock lock = new ReentrantLock();

    @Resource(name = "consistencyDelegate"
) private ConsistencyService consistencyService; @Autowired private SwitchDomain switchDomain; @Autowired private DistroMapper distroMapper; @Autowired private ServerListManager serverListManager; @Autowired private PushService pushService; private final Object putServiceLock = new Object(); //...... public void updateInstance(String namespaceId,String serviceName,Instance instance) throws NacosException { Service service = getService(namespaceId,serviceName); if
(service == null) { throw new NacosException(NacosException.INVALID_PARAM,"service not found,namespace: " + namespaceId + ",service: " + serviceName); } if (!service.allIPs().contains(instance)) { throw new NacosException(NacosException.INVALID_PARAM,"instance not exist: "
+ instance); } addInstance(namespaceId,serviceName,instance.isEphemeral(),instance); } public void addInstance(String namespaceId,boolean ephemeral,Instance... ips) throws NacosException { String key = KeyBuilder.buildInstanceListKey(namespaceId,ephemeral); Service service = getService(namespaceId,serviceName); List<Instance> instanceList = addIpAddresses(service,ephemeral,ips); Instances instances = new Instances(); instances.setInstanceList(instanceList); consistencyService.put(key,instances); } public List<Instance> addIpAddresses(Service service,Instance... ips) throws NacosException { return updateIpAddresses(service,UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD,ips); } public List<Instance> updateIpAddresses(Service service,String action,Instance... ips) throws NacosException { Datum datum = consistencyService.get(KeyBuilder.buildInstanceListKey(service.getNamespaceId(),service.getName(),ephemeral)); Map<String,Instance> oldInstanceMap = new HashMap<>(16); List<Instance> currentIPs = service.allIPs(ephemeral); Map<String,Instance> map = new ConcurrentHashMap<>(currentIPs.size()); for (Instance instance : currentIPs) { map.put(instance.toIPAddr(),instance); } if (datum != null) { oldInstanceMap = setValid(((Instances) datum.value).getInstanceList(),map); } // use HashMap for deep copy: HashMap<String,Instance> instanceMap = new HashMap<>(oldInstanceMap.size()); instanceMap.putAll(oldInstanceMap); for (Instance instance : ips) { if (!service.getClusterMap().containsKey(instance.getClusterName())) { Cluster cluster = new Cluster(instance.getClusterName(),service); cluster.init(); service.getClusterMap().put(instance.getClusterName(),cluster); Loggers.SRV_LOG.warn("cluster: {} not found,ip: {},will create new cluster with default configuration.",instance.getClusterName(),instance.toJSON()); } if (UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE.equals(action)) { instanceMap.remove(instance.getDatumKey()); } else { instanceMap.put(instance.getDatumKey(),instance); } } if (instanceMap.size() <= 0 && UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD.equals(action)) { throw new IllegalArgumentException("ip list can not be empty,service: " + service.getName() + ",ip list: " + JSON.toJSONString(instanceMap.values())); } return new ArrayList<>(instanceMap.values()); } //...... } 複製程式碼
  • updateInstance會通過service.allIPs().contains(instance)校驗要更新的instance是否存在,不存在則丟擲NacosException,存在則執行addInstance方法
  • addInstance方法它會獲取service,然後執行addIpAddresses,最後執行consistencyService.put;addIpAddresses呼叫的是updateIpAddresses方法,其action引數為UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD
  • updateIpAddresses方法首先從consistencyService獲取datum,然後通過service.allIPs方法獲取currentIPs,之後根據datum設定oldInstanceMap,對於UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE型別執行刪除,其餘的action則將instance方法到instanceMap中

DistroConsistencyServiceImpl.put

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/DistroConsistencyServiceImpl.java

@org.springframework.stereotype.Service("distroConsistencyService")
public class DistroConsistencyServiceImpl implements EphemeralConsistencyService {

    private ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(1,new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);

            t.setDaemon(true);
            t.setName("com.alibaba.nacos.naming.distro.notifier");

            return t;
        }
    });

    @Autowired
    private DistroMapper distroMapper;

    @Autowired
    private DataStore dataStore;

    @Autowired
    private TaskDispatcher taskDispatcher;

    @Autowired
    private DataSyncer dataSyncer;

    @Autowired
    private Serializer serializer;

    @Autowired
    private ServerListManager serverListManager;

    @Autowired
    private SwitchDomain switchDomain;

    @Autowired
    private GlobalConfig globalConfig;

    private boolean initialized = false;

    public volatile Notifier notifier = new Notifier();

    private Map<String,CopyOnWriteArrayList<RecordListener>> listeners = new ConcurrentHashMap<>();

    private Map<String,String> syncChecksumTasks = new ConcurrentHashMap<>(16);

    //......

    public void put(String key,Record value) throws NacosException {
        onPut(key,value);
        taskDispatcher.addTask(key);
    }

    public void onPut(String key,Record value) {

        if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
            Datum<Instances> datum = new Datum<>();
            datum.value = (Instances) value;
            datum.key = key;
            datum.timestamp.incrementAndGet();
            dataStore.put(key,datum);
        }

        if (!listeners.containsKey(key)) {
            return;
        }

        notifier.addTask(key,ApplyAction.CHANGE);
    }
    //......
}
複製程式碼
  • DistroConsistencyServiceImpl的put方法會先執行onPut,然後執行taskDispatcher.addTask(key);onPut在判斷key是ephemeralInstanceListKey時會建立一個Datum,遞增其timestamp,然後放到dataStore中,最後呼叫notifier.addTask(key,ApplyAction.CHANGE)

Notifier.addTask

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/DistroConsistencyServiceImpl.java

    public class Notifier implements Runnable {

        private ConcurrentHashMap<String,String> services = new ConcurrentHashMap<>(10 * 1024);

        private BlockingQueue<Pair> tasks = new LinkedBlockingQueue<Pair>(1024 * 1024);

        public void addTask(String datumKey,ApplyAction action) {

            if (services.containsKey(datumKey) && action == ApplyAction.CHANGE) {
                return;
            }
            if (action == ApplyAction.CHANGE) {
                services.put(datumKey,StringUtils.EMPTY);
            }
            tasks.add(Pair.with(datumKey,action));
        }

        public int getTaskSize() {
            return tasks.size();
        }

        @Override
        public void run() {
            Loggers.DISTRO.info("distro notifier started");

            while (true) {
                try {

                    Pair pair = tasks.take();

                    if (pair == null) {
                        continue;
                    }

                    String datumKey = (String) pair.getValue0();
                    ApplyAction action = (ApplyAction) pair.getValue1();

                    services.remove(datumKey);

                    int count = 0;

                    if (!listeners.containsKey(datumKey)) {
                        continue;
                    }

                    for (RecordListener listener : listeners.get(datumKey)) {

                        count++;

                        try {
                            if (action == ApplyAction.CHANGE) {
                                listener.onChange(datumKey,dataStore.get(datumKey).value);
                                continue;
                            }

                            if (action == ApplyAction.DELETE) {
                                listener.onDelete(datumKey);
                                continue;
                            }
                        } catch (Throwable e) {
                            Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}",datumKey,e);
                        }
                    }

                    if (Loggers.DISTRO.isDebugEnabled()) {
                        Loggers.DISTRO.debug("[NACOS-DISTRO] datum change notified,key: {},listener count: {},action: {}",count,action.name());
                    }
                } catch (Throwable e) {
                    Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task",e);
                }
            }
        }
    }
複製程式碼
  • Notifier的addTask方法對於action為ApplyAction.CHANGE的且不在services當中的會放入到services當中,最後新增到tasks;run方法會不斷從tasks取出資料,執行相應的回撥

TaskDispatcher.addTask

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/TaskDispatcher.java

@Component
public class TaskDispatcher {

    @Autowired
    private GlobalConfig partitionConfig;

    @Autowired
    private DataSyncer dataSyncer;

    private List<TaskScheduler> taskSchedulerList = new ArrayList<>();

    private final int cpuCoreCount = Runtime.getRuntime().availableProcessors();

    @PostConstruct
    public void init() {
        for (int i = 0; i < cpuCoreCount; i++) {
            TaskScheduler taskScheduler = new TaskScheduler(i);
            taskSchedulerList.add(taskScheduler);
            GlobalExecutor.submitTaskDispatch(taskScheduler);
        }
    }

    public void addTask(String key) {
        taskSchedulerList.get(UtilsAndCommons.shakeUp(key,cpuCoreCount)).addTask(key);
    }

    public class TaskScheduler implements Runnable {

        private int index;

        private int dataSize = 0;

        private long lastDispatchTime = 0L;

        private BlockingQueue<String> queue = new LinkedBlockingQueue<>(128 * 1024);

        public TaskScheduler(int index) {
            this.index = index;
        }

        public void addTask(String key) {
            queue.offer(key);
        }

        public int getIndex() {
            return index;
        }

        @Override
        public void run() {

            List<String> keys = new ArrayList<>();
            while (true) {

                try {

                    String key = queue.poll(partitionConfig.getTaskDispatchPeriod(),TimeUnit.MILLISECONDS);

                    if (Loggers.DISTRO.isDebugEnabled() && StringUtils.isNotBlank(key)) {
                        Loggers.DISTRO.debug("got key: {}",key);
                    }

                    if (dataSyncer.getServers() == null || dataSyncer.getServers().isEmpty()) {
                        continue;
                    }

                    if (StringUtils.isBlank(key)) {
                        continue;
                    }

                    if (dataSize == 0) {
                        keys = new ArrayList<>();
                    }

                    keys.add(key);
                    dataSize++;

                    if (dataSize == partitionConfig.getBatchSyncKeyCount() ||
                        (System.currentTimeMillis() - lastDispatchTime) > partitionConfig.getTaskDispatchPeriod()) {

                        for (Server member : dataSyncer.getServers()) {
                            if (NetUtils.localServer().equals(member.getKey())) {
                                continue;
                            }
                            SyncTask syncTask = new SyncTask();
                            syncTask.setKeys(keys);
                            syncTask.setTargetServer(member.getKey());

                            if (Loggers.DISTRO.isDebugEnabled() && StringUtils.isNotBlank(key)) {
                                Loggers.DISTRO.debug("add sync task: {}",JSON.toJSONString(syncTask));
                            }

                            dataSyncer.submit(syncTask,0);
                        }
                        lastDispatchTime = System.currentTimeMillis();
                        dataSize = 0;
                    }

                } catch (Exception e) {
                    Loggers.DISTRO.error("dispatch sync task failed.",e);
                }
            }
        }
    }
}
複製程式碼
  • TaskDispatcher的addTask方法會從taskSchedulerList獲取指定的TaskScheduler,然後執行其addTask方法;TaskScheduler的addTask方法會往queue中新增資料,而run方法則不斷從queue取資料,然後通過dataSyncer執行syncTask

SyncTask

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/SyncTask.java

public class SyncTask {

    private List<String> keys;

    private int retryCount;

    private long lastExecuteTime;

    private String targetServer;

    public List<String> getKeys() {
        return keys;
    }

    public void setKeys(List<String> keys) {
        this.keys = keys;
    }

    public int getRetryCount() {
        return retryCount;
    }

    public void setRetryCount(int retryCount) {
        this.retryCount = retryCount;
    }

    public long getLastExecuteTime() {
        return lastExecuteTime;
    }

    public void setLastExecuteTime(long lastExecuteTime) {
        this.lastExecuteTime = lastExecuteTime;
    }

    public String getTargetServer() {
        return targetServer;
    }

    public void setTargetServer(String targetServer) {
        this.targetServer = targetServer;
    }
}
複製程式碼
  • SyncTask包含了keys、targetServer屬性,其中targetServer用於告訴DataSyncer該往哪個server執行sync操作

小結

  • updateInstance會通過service.allIPs().contains(instance)校驗要更新的instance是否存在,不存在則丟擲NacosException,存在則執行addInstance方法
  • addInstance方法它會獲取service,然後執行addIpAddresses,最後執行consistencyService.put;addIpAddresses呼叫的是updateIpAddresses方法,其action引數為UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD
  • updateIpAddresses方法首先從consistencyService獲取datum,然後通過service.allIPs方法獲取currentIPs,之後根據datum設定oldInstanceMap,對於UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE型別執行刪除,其餘的action則將instance方法到instanceMap中

doc