1. 程式人生 > 程式設計 >聊聊nacos ServiceManager的removeInstance

聊聊nacos ServiceManager的removeInstance

本文主要研究一下nacos ServiceManager的removeInstance

ServiceManager

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/core/ServiceManager.java

@Component
@DependsOn("nacosApplicationContext")
public class ServiceManager implements RecordListener<Service> {

    /**
     * Map<namespace,Map<group::serviceName,Service>>
     */
    private Map<String,Map<String,Service>> serviceMap = new ConcurrentHashMap<>();

    private LinkedBlockingDeque<ServiceKey> toBeUpdatedServicesQueue = new LinkedBlockingDeque<>(1024 * 1024);

    private Synchronizer synchronizer = new ServiceStatusSynchronizer();

    private final Lock lock = new ReentrantLock();

    @Resource(name = "consistencyDelegate"
) private ConsistencyService consistencyService; @Autowired private SwitchDomain switchDomain; @Autowired private DistroMapper distroMapper; @Autowired private ServerListManager serverListManager; @Autowired private PushService pushService; private final Object putServiceLock = new Object(); //...... public void removeInstance(String namespaceId,String serviceName,boolean ephemeral,Instance... ips) throws NacosException { Service service = getService(namespaceId,serviceName); removeInstance(namespaceId,serviceName,ephemeral,service,ips); } public void removeInstance(String namespaceId,Service service,Instance... ips) throws NacosException { String key = KeyBuilder.buildInstanceListKey(namespaceId,ephemeral); List<Instance> instanceList = substractIpAddresses(service,ips); Instances instances = new Instances(); instances.setInstanceList(instanceList); consistencyService.put(key,instances); } public List<Instance> substractIpAddresses(Service service,Instance... ips) throws NacosException { return
updateIpAddresses(service,UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE,ips); } public List<Instance> updateIpAddresses(Service service,String action,Instance... ips) throws NacosException { Datum datum = consistencyService.get(KeyBuilder.buildInstanceListKey(service.getNamespaceId(),service.getName(),ephemeral)); Map<String,Instance> oldInstanceMap = new HashMap<>(16); List<Instance> currentIPs = service.allIPs(ephemeral); Map<String,Instance> map = new ConcurrentHashMap<>(currentIPs.size()); for
(Instance instance : currentIPs) { map.put(instance.toIPAddr(),instance); } if (datum != null) { oldInstanceMap = setValid(((Instances) datum.value).getInstanceList(),map); } // use HashMap for deep copy: HashMap<String,Instance> instanceMap = new HashMap<>(oldInstanceMap.size()); instanceMap.putAll(oldInstanceMap); for (Instance instance : ips) { if (!service.getClusterMap().containsKey(instance.getClusterName())) { Cluster cluster = new Cluster(instance.getClusterName(),service); cluster.init(); service.getClusterMap().put(instance.getClusterName(),cluster); Loggers.SRV_LOG.warn("cluster: {} not found,ip: {},will create new cluster with default configuration.",instance.getClusterName(),instance.toJSON()); } if (UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE.equals(action)) { instanceMap.remove(instance.getDatumKey()); } else { instanceMap.put(instance.getDatumKey(),instance); } } if (instanceMap.size() <= 0 && UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD.equals(action)) { throw new IllegalArgumentException("ip list can not be empty,service: " + service.getName() + ",ip list: " + JSON.toJSONString(instanceMap.values())); } return new ArrayList<>(instanceMap.values()); } //...... } 複製程式碼
  • removeInstance方法通過substractIpAddresses獲取移除執行instances之後的instanceList,然後通過consistencyService.put方法進行更新;substractIpAddresses方法執行updateIpAddresses方法,其action引數值為UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE,它先使用service.allIPs(ephemeral)獲取instance列表,針對remove動作會根據instance.getDatumKey()將其從instanceMap中移除

Service.allIPs

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/core/Service.java

public class Service extends com.alibaba.nacos.api.naming.pojo.Service implements Record,RecordListener<Instances> {

    private static final String SERVICE_NAME_SYNTAX = "[0-9a-zA-Z@\\.:_-]+";

    @JSONField(serialize = false)
    private ClientBeatCheckTask clientBeatCheckTask = new ClientBeatCheckTask(this);

    private String token;
    private List<String> owners = new ArrayList<>();
    private Boolean resetWeight = false;
    private Boolean enabled = true;
    private Selector selector = new NoneSelector();
    private String namespaceId;

    /**
     * IP will be deleted if it has not send beat for some time,default timeout is 30 seconds.
     */
    private long ipDeleteTimeout = 30 * 1000;

    private volatile long lastModifiedMillis = 0L;

    private volatile String checksum;

    /**
     * TODO set customized push expire time:
     */
    private long pushCacheMillis = 0L;

    private Map<String,Cluster> clusterMap = new HashMap<>();

    //......

    public List<Instance> allIPs(boolean ephemeral) {
        List<Instance> allIPs = new ArrayList<>();
        for (Map.Entry<String,Cluster> entry : clusterMap.entrySet()) {
            allIPs.addAll(entry.getValue().allIPs(ephemeral));
        }

        return allIPs;
    }

    //......
}
複製程式碼
  • Service的allIPs方法會遍歷clusterMap,然後通過Cluster.allIPs(ephemeral)收集Instance

Cluster.allIPs

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/core/Cluster.java

public class Cluster extends com.alibaba.nacos.api.naming.pojo.Cluster implements Cloneable {

    private static final String CLUSTER_NAME_SYNTAX = "[0-9a-zA-Z-]+";
    /**
     * a addition for same site routing,can group multiple sites into a region,like Hangzhou,Shanghai,etc.
     */
    private String sitegroup = StringUtils.EMPTY;

    private int defCkport = 80;

    private int defIPPort = -1;

    @JSONField(serialize = false)
    private HealthCheckTask checkTask;

    @JSONField(serialize = false)
    private Set<Instance> persistentInstances = new HashSet<>();

    @JSONField(serialize = false)
    private Set<Instance> ephemeralInstances = new HashSet<>();

    @JSONField(serialize = false)
    private Service service;

    @JSONField(serialize = false)
    private volatile boolean inited = false;

    private Map<String,String> metadata = new ConcurrentHashMap<>();

    //......

    public List<Instance> allIPs(boolean ephemeral) {
        return ephemeral ? new ArrayList<>(ephemeralInstances) : new ArrayList<>(persistentInstances);
    }

    //......
}
複製程式碼
  • Cluster的allIPs方法根據ephemeral來返回ephemeralInstances或者persistentInstances

小結

ServiceManager的removeInstance方法通過substractIpAddresses獲取移除執行instances之後的instanceList,然後通過consistencyService.put方法進行更新;substractIpAddresses方法執行updateIpAddresses方法,其action引數值為UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE,它先使用service.allIPs(ephemeral)獲取instance列表,針對remove動作會根據instance.getDatumKey()將其從instanceMap中移除

doc