聊聊nacos ServiceManager的updateInstance
阿新 • • 發佈:2019-12-31
序
本文主要研究一下nacos ServiceManager的updateInstance
ServiceManager
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/core/ServiceManager.java
@Component
@DependsOn("nacosApplicationContext")
public class ServiceManager implements RecordListener<Service> {
/**
* Map<namespace,Map<group::serviceName,Service>>
*/
private Map<String,Map<String,Service>> serviceMap = new ConcurrentHashMap<>();
private LinkedBlockingDeque<ServiceKey> toBeUpdatedServicesQueue = new LinkedBlockingDeque<>(1024 * 1024);
private Synchronizer synchronizer = new ServiceStatusSynchronizer();
private final Lock lock = new ReentrantLock();
@Resource(name = "consistencyDelegate" )
private ConsistencyService consistencyService;
@Autowired
private SwitchDomain switchDomain;
@Autowired
private DistroMapper distroMapper;
@Autowired
private ServerListManager serverListManager;
@Autowired
private PushService pushService;
private final Object putServiceLock = new Object();
//......
public void updateInstance(String namespaceId,String serviceName,Instance instance) throws NacosException {
Service service = getService(namespaceId,serviceName);
if (service == null) {
throw new NacosException(NacosException.INVALID_PARAM,"service not found,namespace: " + namespaceId + ",service: " + serviceName);
}
if (!service.allIPs().contains(instance)) {
throw new NacosException(NacosException.INVALID_PARAM,"instance not exist: " + instance);
}
addInstance(namespaceId,serviceName,instance.isEphemeral(),instance);
}
public void addInstance(String namespaceId,boolean ephemeral,Instance... ips) throws NacosException {
String key = KeyBuilder.buildInstanceListKey(namespaceId,ephemeral);
Service service = getService(namespaceId,serviceName);
List<Instance> instanceList = addIpAddresses(service,ephemeral,ips);
Instances instances = new Instances();
instances.setInstanceList(instanceList);
consistencyService.put(key,instances);
}
public List<Instance> addIpAddresses(Service service,Instance... ips) throws NacosException {
return updateIpAddresses(service,UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD,ips);
}
public List<Instance> updateIpAddresses(Service service,String action,Instance... ips) throws NacosException {
Datum datum = consistencyService.get(KeyBuilder.buildInstanceListKey(service.getNamespaceId(),service.getName(),ephemeral));
Map<String,Instance> oldInstanceMap = new HashMap<>(16);
List<Instance> currentIPs = service.allIPs(ephemeral);
Map<String,Instance> map = new ConcurrentHashMap<>(currentIPs.size());
for (Instance instance : currentIPs) {
map.put(instance.toIPAddr(),instance);
}
if (datum != null) {
oldInstanceMap = setValid(((Instances) datum.value).getInstanceList(),map);
}
// use HashMap for deep copy:
HashMap<String,Instance> instanceMap = new HashMap<>(oldInstanceMap.size());
instanceMap.putAll(oldInstanceMap);
for (Instance instance : ips) {
if (!service.getClusterMap().containsKey(instance.getClusterName())) {
Cluster cluster = new Cluster(instance.getClusterName(),service);
cluster.init();
service.getClusterMap().put(instance.getClusterName(),cluster);
Loggers.SRV_LOG.warn("cluster: {} not found,ip: {},will create new cluster with default configuration.",instance.getClusterName(),instance.toJSON());
}
if (UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE.equals(action)) {
instanceMap.remove(instance.getDatumKey());
} else {
instanceMap.put(instance.getDatumKey(),instance);
}
}
if (instanceMap.size() <= 0 && UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD.equals(action)) {
throw new IllegalArgumentException("ip list can not be empty,service: " + service.getName() + ",ip list: "
+ JSON.toJSONString(instanceMap.values()));
}
return new ArrayList<>(instanceMap.values());
}
//......
}
複製程式碼
- updateInstance會通過service.allIPs().contains(instance)校驗要更新的instance是否存在,不存在則丟擲NacosException,存在則執行addInstance方法
- addInstance方法它會獲取service,然後執行addIpAddresses,最後執行consistencyService.put;addIpAddresses呼叫的是updateIpAddresses方法,其action引數為UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD
- updateIpAddresses方法首先從consistencyService獲取datum,然後通過service.allIPs方法獲取currentIPs,之後根據datum設定oldInstanceMap,對於UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE型別執行刪除,其餘的action則將instance方法到instanceMap中
DistroConsistencyServiceImpl.put
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/DistroConsistencyServiceImpl.java
@org.springframework.stereotype.Service("distroConsistencyService")
public class DistroConsistencyServiceImpl implements EphemeralConsistencyService {
private ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(1,new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
t.setName("com.alibaba.nacos.naming.distro.notifier");
return t;
}
});
@Autowired
private DistroMapper distroMapper;
@Autowired
private DataStore dataStore;
@Autowired
private TaskDispatcher taskDispatcher;
@Autowired
private DataSyncer dataSyncer;
@Autowired
private Serializer serializer;
@Autowired
private ServerListManager serverListManager;
@Autowired
private SwitchDomain switchDomain;
@Autowired
private GlobalConfig globalConfig;
private boolean initialized = false;
public volatile Notifier notifier = new Notifier();
private Map<String,CopyOnWriteArrayList<RecordListener>> listeners = new ConcurrentHashMap<>();
private Map<String,String> syncChecksumTasks = new ConcurrentHashMap<>(16);
//......
public void put(String key,Record value) throws NacosException {
onPut(key,value);
taskDispatcher.addTask(key);
}
public void onPut(String key,Record value) {
if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
Datum<Instances> datum = new Datum<>();
datum.value = (Instances) value;
datum.key = key;
datum.timestamp.incrementAndGet();
dataStore.put(key,datum);
}
if (!listeners.containsKey(key)) {
return;
}
notifier.addTask(key,ApplyAction.CHANGE);
}
//......
}
複製程式碼
- DistroConsistencyServiceImpl的put方法會先執行onPut,然後執行taskDispatcher.addTask(key);onPut在判斷key是ephemeralInstanceListKey時會建立一個Datum,遞增其timestamp,然後放到dataStore中,最後呼叫notifier.addTask(key,ApplyAction.CHANGE)
Notifier.addTask
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/DistroConsistencyServiceImpl.java
public class Notifier implements Runnable {
private ConcurrentHashMap<String,String> services = new ConcurrentHashMap<>(10 * 1024);
private BlockingQueue<Pair> tasks = new LinkedBlockingQueue<Pair>(1024 * 1024);
public void addTask(String datumKey,ApplyAction action) {
if (services.containsKey(datumKey) && action == ApplyAction.CHANGE) {
return;
}
if (action == ApplyAction.CHANGE) {
services.put(datumKey,StringUtils.EMPTY);
}
tasks.add(Pair.with(datumKey,action));
}
public int getTaskSize() {
return tasks.size();
}
@Override
public void run() {
Loggers.DISTRO.info("distro notifier started");
while (true) {
try {
Pair pair = tasks.take();
if (pair == null) {
continue;
}
String datumKey = (String) pair.getValue0();
ApplyAction action = (ApplyAction) pair.getValue1();
services.remove(datumKey);
int count = 0;
if (!listeners.containsKey(datumKey)) {
continue;
}
for (RecordListener listener : listeners.get(datumKey)) {
count++;
try {
if (action == ApplyAction.CHANGE) {
listener.onChange(datumKey,dataStore.get(datumKey).value);
continue;
}
if (action == ApplyAction.DELETE) {
listener.onDelete(datumKey);
continue;
}
} catch (Throwable e) {
Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}",datumKey,e);
}
}
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO.debug("[NACOS-DISTRO] datum change notified,key: {},listener count: {},action: {}",count,action.name());
}
} catch (Throwable e) {
Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task",e);
}
}
}
}
複製程式碼
- Notifier的addTask方法對於action為ApplyAction.CHANGE的且不在services當中的會放入到services當中,最後新增到tasks;run方法會不斷從tasks取出資料,執行相應的回撥
TaskDispatcher.addTask
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/TaskDispatcher.java
@Component
public class TaskDispatcher {
@Autowired
private GlobalConfig partitionConfig;
@Autowired
private DataSyncer dataSyncer;
private List<TaskScheduler> taskSchedulerList = new ArrayList<>();
private final int cpuCoreCount = Runtime.getRuntime().availableProcessors();
@PostConstruct
public void init() {
for (int i = 0; i < cpuCoreCount; i++) {
TaskScheduler taskScheduler = new TaskScheduler(i);
taskSchedulerList.add(taskScheduler);
GlobalExecutor.submitTaskDispatch(taskScheduler);
}
}
public void addTask(String key) {
taskSchedulerList.get(UtilsAndCommons.shakeUp(key,cpuCoreCount)).addTask(key);
}
public class TaskScheduler implements Runnable {
private int index;
private int dataSize = 0;
private long lastDispatchTime = 0L;
private BlockingQueue<String> queue = new LinkedBlockingQueue<>(128 * 1024);
public TaskScheduler(int index) {
this.index = index;
}
public void addTask(String key) {
queue.offer(key);
}
public int getIndex() {
return index;
}
@Override
public void run() {
List<String> keys = new ArrayList<>();
while (true) {
try {
String key = queue.poll(partitionConfig.getTaskDispatchPeriod(),TimeUnit.MILLISECONDS);
if (Loggers.DISTRO.isDebugEnabled() && StringUtils.isNotBlank(key)) {
Loggers.DISTRO.debug("got key: {}",key);
}
if (dataSyncer.getServers() == null || dataSyncer.getServers().isEmpty()) {
continue;
}
if (StringUtils.isBlank(key)) {
continue;
}
if (dataSize == 0) {
keys = new ArrayList<>();
}
keys.add(key);
dataSize++;
if (dataSize == partitionConfig.getBatchSyncKeyCount() ||
(System.currentTimeMillis() - lastDispatchTime) > partitionConfig.getTaskDispatchPeriod()) {
for (Server member : dataSyncer.getServers()) {
if (NetUtils.localServer().equals(member.getKey())) {
continue;
}
SyncTask syncTask = new SyncTask();
syncTask.setKeys(keys);
syncTask.setTargetServer(member.getKey());
if (Loggers.DISTRO.isDebugEnabled() && StringUtils.isNotBlank(key)) {
Loggers.DISTRO.debug("add sync task: {}",JSON.toJSONString(syncTask));
}
dataSyncer.submit(syncTask,0);
}
lastDispatchTime = System.currentTimeMillis();
dataSize = 0;
}
} catch (Exception e) {
Loggers.DISTRO.error("dispatch sync task failed.",e);
}
}
}
}
}
複製程式碼
- TaskDispatcher的addTask方法會從taskSchedulerList獲取指定的TaskScheduler,然後執行其addTask方法;TaskScheduler的addTask方法會往queue中新增資料,而run方法則不斷從queue取資料,然後通過dataSyncer執行syncTask
SyncTask
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/SyncTask.java
public class SyncTask {
private List<String> keys;
private int retryCount;
private long lastExecuteTime;
private String targetServer;
public List<String> getKeys() {
return keys;
}
public void setKeys(List<String> keys) {
this.keys = keys;
}
public int getRetryCount() {
return retryCount;
}
public void setRetryCount(int retryCount) {
this.retryCount = retryCount;
}
public long getLastExecuteTime() {
return lastExecuteTime;
}
public void setLastExecuteTime(long lastExecuteTime) {
this.lastExecuteTime = lastExecuteTime;
}
public String getTargetServer() {
return targetServer;
}
public void setTargetServer(String targetServer) {
this.targetServer = targetServer;
}
}
複製程式碼
- SyncTask包含了keys、targetServer屬性,其中targetServer用於告訴DataSyncer該往哪個server執行sync操作
小結
- updateInstance會通過service.allIPs().contains(instance)校驗要更新的instance是否存在,不存在則丟擲NacosException,存在則執行addInstance方法
- addInstance方法它會獲取service,然後執行addIpAddresses,最後執行consistencyService.put;addIpAddresses呼叫的是updateIpAddresses方法,其action引數為UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD
- updateIpAddresses方法首先從consistencyService獲取datum,然後通過service.allIPs方法獲取currentIPs,之後根據datum設定oldInstanceMap,對於UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE型別執行刪除,其餘的action則將instance方法到instanceMap中