1. 程式人生 > 程式設計 >聊聊nacos ServiceManager的registerInstance

聊聊nacos ServiceManager的registerInstance

本文主要研究一下nacos ServiceManager的registerInstance

ServiceManager

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/core/ServiceManager.java

@Component
@DependsOn("nacosApplicationContext")
public class ServiceManager implements RecordListener<Service> {

    /**
     * Map<namespace,Map<group::serviceName,Service>>
     */
    private Map<String,Map<String,Service>> serviceMap = new ConcurrentHashMap<>();

    private LinkedBlockingDeque<ServiceKey> toBeUpdatedServicesQueue = new LinkedBlockingDeque<>(1024 * 1024);

    private Synchronizer synchronizer = new ServiceStatusSynchronizer();

    private final Lock lock = new ReentrantLock();

    @Resource(name = "consistencyDelegate"
) private ConsistencyService consistencyService; @Autowired private SwitchDomain switchDomain; @Autowired private DistroMapper distroMapper; @Autowired private ServerListManager serverListManager; @Autowired private PushService pushService; private final Object putServiceLock = new Object(); //...... public void registerInstance(String namespaceId,String serviceName,Instance instance) throws NacosException { createEmptyService(namespaceId,serviceName,instance.isEphemeral()); Service service = getService(namespaceId,serviceName); if
(service == null) { throw new NacosException(NacosException.INVALID_PARAM,"service not found,namespace: " + namespaceId + ",service: " + serviceName); } addInstance(namespaceId,instance.isEphemeral(),instance); } public void createEmptyService(String namespaceId,boolean local
) throws NacosException { createServiceIfAbsent(namespaceId,local,null); } public void createServiceIfAbsent(String namespaceId,boolean local,Cluster cluster) throws NacosException { Service service = getService(namespaceId,serviceName); if (service == null) { Loggers.SRV_LOG.info("creating empty service {}:{}",namespaceId,serviceName); service = new Service(); service.setName(serviceName); service.setNamespaceId(namespaceId); service.setGroupName(NamingUtils.getGroupName(serviceName)); // now validate the service. if failed,exception will be thrown service.setLastModifiedMillis(System.currentTimeMillis()); service.recalculateChecksum(); if (cluster != null) { cluster.setService(service); service.getClusterMap().put(cluster.getName(),cluster); } service.validate(); if (local) { putServiceAndInit(service); } else { addOrReplaceService(service); } } } public Service getService(String namespaceId,String serviceName) { if (serviceMap.get(namespaceId) == null) { return null; } return chooseServiceMap(namespaceId).get(serviceName); } public Map<String,Service> chooseServiceMap(String namespaceId) { return serviceMap.get(namespaceId); } private void putServiceAndInit(Service service) throws NacosException { putService(service); service.init(); consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(),service.getName(),true),service); consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(),false),service); Loggers.SRV_LOG.info("[NEW-SERVICE] {}",service.toJSON()); } public void putService(Service service) { if (!serviceMap.containsKey(service.getNamespaceId())) { synchronized (putServiceLock) { if (!serviceMap.containsKey(service.getNamespaceId())) { serviceMap.put(service.getNamespaceId(),new ConcurrentHashMap<>(16)); } } } serviceMap.get(service.getNamespaceId()).put(service.getName(),service); } public void addOrReplaceService(Service service) throws NacosException { consistencyService.put(KeyBuilder.buildServiceMetaKey(service.getNamespaceId(),service.getName()),service); } public void addInstance(String namespaceId,boolean ephemeral,Instance... ips) throws NacosException { String key = KeyBuilder.buildInstanceListKey(namespaceId,ephemeral); Service service = getService(namespaceId,serviceName); List<Instance> instanceList = addIpAddresses(service,ephemeral,ips); Instances instances = new Instances(); instances.setInstanceList(instanceList); consistencyService.put(key,instances); } public List<Instance> addIpAddresses(Service service,Instance... ips) throws NacosException { return updateIpAddresses(service,UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD,ips); } public List<Instance> updateIpAddresses(Service service,String action,Instance... ips) throws NacosException { Datum datum = consistencyService.get(KeyBuilder.buildInstanceListKey(service.getNamespaceId(),ephemeral)); Map<String,Instance> oldInstanceMap = new HashMap<>(16); List<Instance> currentIPs = service.allIPs(ephemeral); Map<String,Instance> map = new ConcurrentHashMap<>(currentIPs.size()); for (Instance instance : currentIPs) { map.put(instance.toIPAddr(),instance); } if (datum != null) { oldInstanceMap = setValid(((Instances) datum.value).getInstanceList(),map); } // use HashMap for deep copy: HashMap<String,Instance> instanceMap = new HashMap<>(oldInstanceMap.size()); instanceMap.putAll(oldInstanceMap); for (Instance instance : ips) { if (!service.getClusterMap().containsKey(instance.getClusterName())) { Cluster cluster = new Cluster(instance.getClusterName(),service); cluster.init(); service.getClusterMap().put(instance.getClusterName(),cluster); Loggers.SRV_LOG.warn("cluster: {} not found,ip: {},will create new cluster with default configuration.",instance.getClusterName(),instance.toJSON()); } if (UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE.equals(action)) { instanceMap.remove(instance.getDatumKey()); } else { instanceMap.put(instance.getDatumKey(),instance); } } if (instanceMap.size() <= 0 && UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD.equals(action)) { throw new IllegalArgumentException("ip list can not be empty,service: " + service.getName() + ",ip list: " + JSON.toJSONString(instanceMap.values())); } return new ArrayList<>(instanceMap.values()); } //...... } 複製程式碼
  • registerInstance方法首先執行createEmptyService在service不存在的時候會建立,然後再獲取service,最後執行addInstance;createEmptyService的local引數取之於instance.isEphemeral(),它主要是執行createServiceIfAbsent方法,其cluster引數為null;它首先通過getService方法來獲取service(從serviceMap中獲取),獲取不到則建立,local為true執行putServiceAndInit,否則執行addOrReplaceService
  • putServiceAndInit方法首先執行putService,然後執行service.init,然後回撥consistencyService.listen方法;putService方法主要是往serviceMap新增service資訊;addOrReplaceService方法則是執行consistencyService.put方法
  • addInstance方法它會獲取service,然後執行addIpAddresses,最後執行consistencyService.put;addIpAddresses呼叫的是updateIpAddresses方法,其action引數為UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD;updateIpAddresses方法首先從consistencyService獲取datum,然後通過service.allIPs方法獲取currentIPs,之後根據datum設定oldInstanceMap,最後放到instanceMap中

Service.init

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/core/Service.java

public class Service extends com.alibaba.nacos.api.naming.pojo.Service implements Record,RecordListener<Instances> {

    private static final String SERVICE_NAME_SYNTAX = "[0-9a-zA-Z@\\.:_-]+";

    @JSONField(serialize = false)
    private ClientBeatCheckTask clientBeatCheckTask = new ClientBeatCheckTask(this);

    private String token;
    private List<String> owners = new ArrayList<>();
    private Boolean resetWeight = false;
    private Boolean enabled = true;
    private Selector selector = new NoneSelector();
    private String namespaceId;

    /**
     * IP will be deleted if it has not send beat for some time,default timeout is 30 seconds.
     */
    private long ipDeleteTimeout = 30 * 1000;

    private volatile long lastModifiedMillis = 0L;

    private volatile String checksum;

    //......

    public void init() {

        HealthCheckReactor.scheduleCheck(clientBeatCheckTask);

        for (Map.Entry<String,Cluster> entry : clusterMap.entrySet()) {
            entry.getValue().setService(this);
            entry.getValue().init();
        }
    }

    //......
}
複製程式碼
  • init方法則是通過HealthCheckReactor.scheduleCheck(clientBeatCheckTask)排程ClientBeatCheckTask,然後執行Cluster的init方法

ClientBeatCheckTask

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/ClientBeatCheckTask.java

public class ClientBeatCheckTask implements Runnable {

    private Service service;

    public ClientBeatCheckTask(Service service) {
        this.service = service;
    }


    @JSONField(serialize = false)
    public PushService getPushService() {
        return SpringContext.getAppContext().getBean(PushService.class);
    }

    @JSONField(serialize = false)
    public DistroMapper getDistroMapper() {
        return SpringContext.getAppContext().getBean(DistroMapper.class);
    }

    public GlobalConfig getGlobalConfig() {
        return SpringContext.getAppContext().getBean(GlobalConfig.class);
    }

    public String taskKey() {
        return service.getName();
    }

    @Override
    public void run() {
        try {
            if (!getDistroMapper().responsible(service.getName())) {
                return;
            }

            List<Instance> instances = service.allIPs(true);

            // first set health status of instances:
            for (Instance instance : instances) {
                if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {
                    if (!instance.isMarked()) {
                        if (instance.isHealthy()) {
                            instance.setHealthy(false);
                            Loggers.EVT_LOG.info("{POS} {IP-DISABLED} valid: {}:{}@{}@{},region: {},msg: client timeout after {},last beat: {}",instance.getIp(),instance.getPort(),UtilsAndCommons.LOCALHOST_SITE,instance.getInstanceHeartBeatTimeOut(),instance.getLastBeat());
                            getPushService().serviceChanged(service);
                            SpringContext.getAppContext().publishEvent(new InstanceHeartbeatTimeoutEvent(this,instance));
                        }
                    }
                }
            }

            if (!getGlobalConfig().isExpireInstance()) {
                return;
            }

            // then remove obsolete instances:
            for (Instance instance : instances) {

                if (instance.isMarked()) {
                    continue;
                }

                if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {
                    // delete instance
                    Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {},ip: {}",JSON.toJSONString(instance));
                    deleteIP(instance);
                }
            }

        } catch (Exception e) {
            Loggers.SRV_LOG.warn("Exception while processing client beat time out.",e);
        }

    }


    private void deleteIP(Instance instance) {

        try {
            NamingProxy.Request request = NamingProxy.Request.newRequest();
            request.appendParam("ip",instance.getIp())
                .appendParam("port",String.valueOf(instance.getPort()))
                .appendParam("ephemeral","true")
                .appendParam("clusterName",instance.getClusterName())
                .appendParam("serviceName",service.getName())
                .appendParam("namespaceId",service.getNamespaceId());

            String url = "http://127.0.0.1:" + RunningConfig.getServerPort() + RunningConfig.getContextPath()
                + UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance?" + request.toUrl();

            // delete instance asynchronously:
            HttpClient.asyncHttpDelete(url,null,new AsyncCompletionHandler() {
                @Override
                public Object onCompleted(Response response) throws Exception {
                    if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {
                        Loggers.SRV_LOG.error("[IP-DEAD] failed to delete ip automatically,caused {},resp code: {}",instance.toJSON(),response.getResponseBody(),response.getStatusCode());
                    }
                    return null;
                }
            });

        } catch (Exception e) {
            Loggers.SRV_LOG.error("[IP-DEAD] failed to delete ip automatically,error: {}",e);
        }
    }
}
複製程式碼
  • ClientBeatCheckTask實現了Runnable介面,其run方法首先判斷是否可以處理該service,可以的話,則獲取service下所有的instances,對於距離上次心跳時間超過instanceHeartBeatTimeOut的進行處理,如果還尚未被marked,且還是healthy的更改其healthy為false,然後觸發pushService的serviceChanged方法,最後釋出InstanceHeartbeatTimeoutEvent事件;最後再次遍歷instances,對於非marked而且距離上次心跳時間超過instanceHeartBeatTimeOut的進行deleteIP操作;deleteIP方法會執行nacos的delete請求刪除例項資訊

小結

  • registerInstance方法首先執行createEmptyService在service不存在的時候會建立,然後再獲取service,最後執行addInstance;createEmptyService的local引數取之於instance.isEphemeral(),它主要是執行createServiceIfAbsent方法,其cluster引數為null;它首先通過getService方法來獲取service(從serviceMap中獲取),獲取不到則建立,local為true執行putServiceAndInit,否則執行addOrReplaceService
  • putServiceAndInit方法首先執行putService,然後執行service.init,然後回撥consistencyService.listen方法;putService方法主要是往serviceMap新增service資訊;addOrReplaceService方法則是執行consistencyService.put方法
  • addInstance方法它會獲取service,然後執行addIpAddresses,最後執行consistencyService.put;addIpAddresses呼叫的是updateIpAddresses方法,其action引數為UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD;updateIpAddresses方法首先從consistencyService獲取datum,然後通過service.allIPs方法獲取currentIPs,之後根據datum設定oldInstanceMap,最後放到instanceMap中

doc