聊聊nacos ServiceManager的UpdatedServiceProcessor
阿新 • • 發佈:2019-12-31
序
本文主要研究一下nacos ServiceManager的UpdatedServiceProcessor
ServiceManager.init
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/core/ServiceManager.java
@Component
@DependsOn("nacosApplicationContext")
public class ServiceManager implements RecordListener<Service> {
/**
* Map<namespace,Map<group::serviceName,Service>>
*/
private Map<String,Map<String,Service>> serviceMap = new ConcurrentHashMap<>();
private LinkedBlockingDeque<ServiceKey> toBeUpdatedServicesQueue = new LinkedBlockingDeque<>(1024 * 1024);
private Synchronizer synchronizer = new ServiceStatusSynchronizer();
private final Lock lock = new ReentrantLock();
@Resource(name = "consistencyDelegate" )
private ConsistencyService consistencyService;
@Autowired
private SwitchDomain switchDomain;
@Autowired
private DistroMapper distroMapper;
@Autowired
private ServerListManager serverListManager;
@Autowired
private PushService pushService;
private final Object putServiceLock = new Object();
@PostConstruct
public void init () {
UtilsAndCommons.SERVICE_SYNCHRONIZATION_EXECUTOR.schedule(new ServiceReporter(),60000,TimeUnit.MILLISECONDS);
UtilsAndCommons.SERVICE_UPDATE_EXECUTOR.submit(new UpdatedServiceProcessor());
try {
Loggers.SRV_LOG.info("listen for service meta change");
consistencyService.listen(KeyBuilder.SERVICE_META_KEY_PREFIX,this);
} catch (NacosException e) {
Loggers.SRV_LOG.error("listen for service meta change failed!" );
}
}
//......
}
複製程式碼
- ServiceManager的init方法往UtilsAndCommons.SERVICE_UPDATE_EXECUTOR提交了UpdatedServiceProcessor任務
UpdatedServiceProcessor
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/core/ServiceManager.java
private class UpdatedServiceProcessor implements Runnable {
//get changed service from other server asynchronously
@Override
public void run() {
ServiceKey serviceKey = null;
try {
while (true) {
try {
serviceKey = toBeUpdatedServicesQueue.take();
} catch (Exception e) {
Loggers.EVT_LOG.error("[UPDATE-DOMAIN] Exception while taking item from LinkedBlockingDeque.");
}
if (serviceKey == null) {
continue;
}
GlobalExecutor.submitServiceUpdate(new ServiceUpdater(serviceKey));
}
} catch (Exception e) {
Loggers.EVT_LOG.error("[UPDATE-DOMAIN] Exception while update service: {}",serviceKey,e);
}
}
}
複製程式碼
- UpdatedServiceProcessor實現了Runnable方法,其run方法會不斷迴圈從toBeUpdatedServicesQueue獲取元素,然後使用GlobalExecutor.submitServiceUpdate提交ServiceUpdater
ServiceUpdater
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/core/ServiceManager.java
private class ServiceUpdater implements Runnable {
String namespaceId;
String serviceName;
String serverIP;
public ServiceUpdater(ServiceKey serviceKey) {
this.namespaceId = serviceKey.getNamespaceId();
this.serviceName = serviceKey.getServiceName();
this.serverIP = serviceKey.getServerIP();
}
@Override
public void run() {
try {
updatedHealthStatus(namespaceId,serviceName,serverIP);
} catch (Exception e) {
Loggers.SRV_LOG.warn("[DOMAIN-UPDATER] Exception while update service: {} from {},error: {}",serverIP,e);
}
}
}
複製程式碼
- ServiceUpdater實現了Runnable介面,其run方法執行的是updatedHealthStatus
ServiceManager.updatedHealthStatus
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/core/ServiceManager.java
@Component
@DependsOn("nacosApplicationContext")
public class ServiceManager implements RecordListener<Service> {
/**
* Map<namespace,Service>> serviceMap = new ConcurrentHashMap<>();
private LinkedBlockingDeque<ServiceKey> toBeUpdatedServicesQueue = new LinkedBlockingDeque<>(1024 * 1024);
private Synchronizer synchronizer = new ServiceStatusSynchronizer();
private final Lock lock = new ReentrantLock();
@Resource(name = "consistencyDelegate")
private ConsistencyService consistencyService;
@Autowired
private SwitchDomain switchDomain;
@Autowired
private DistroMapper distroMapper;
@Autowired
private ServerListManager serverListManager;
@Autowired
private PushService pushService;
private final Object putServiceLock = new Object();
//......
public void updatedHealthStatus(String namespaceId,String serviceName,String serverIP) {
Message msg = synchronizer.get(serverIP,UtilsAndCommons.assembleFullServiceName(namespaceId,serviceName));
JSONObject serviceJson = JSON.parseObject(msg.getData());
JSONArray ipList = serviceJson.getJSONArray("ips");
Map<String,String> ipsMap = new HashMap<>(ipList.size());
for (int i = 0; i < ipList.size(); i++) {
String ip = ipList.getString(i);
String[] strings = ip.split("_");
ipsMap.put(strings[0],strings[1]);
}
Service service = getService(namespaceId,serviceName);
if (service == null) {
return;
}
boolean changed = false;
List<Instance> instances = service.allIPs();
for (Instance instance : instances) {
boolean valid = Boolean.parseBoolean(ipsMap.get(instance.toIPAddr()));
if (valid != instance.isHealthy()) {
changed = true;
instance.setHealthy(valid);
Loggers.EVT_LOG.info("{} {SYNC} IP-{} : {}@{}{}",(instance.isHealthy() ? "ENABLED" : "DISABLED"),instance.getIp(),instance.getPort(),instance.getClusterName());
}
}
if (changed) {
pushService.serviceChanged(service);
}
StringBuilder stringBuilder = new StringBuilder();
List<Instance> allIps = service.allIPs();
for (Instance instance : allIps) {
stringBuilder.append(instance.toIPAddr()).append("_").append(instance.isHealthy()).append(",");
}
if (changed && Loggers.EVT_LOG.isDebugEnabled()) {
Loggers.EVT_LOG.debug("[HEALTH-STATUS-UPDATED] namespace: {},service: {},ips: {}",service.getNamespaceId(),service.getName(),stringBuilder.toString());
}
}
//......
}
複製程式碼
- updatedHealthStatus方法會從synchronizer獲取msg,組裝ipsMap,之後通過service.allIPs()獲取instances資訊,然後遍歷instances從ipsMap獲取例項的valid狀態,如果與instance的isHealthy()對不上則標記為changed,更新instance的healthy;對於changed的則通過pushService.serviceChanged(service)釋出事件,最後列印日誌
小結
- ServiceManager的init方法往UtilsAndCommons.SERVICE_UPDATE_EXECUTOR提交了UpdatedServiceProcessor任務
- UpdatedServiceProcessor實現了Runnable方法,其run方法會不斷迴圈從toBeUpdatedServicesQueue獲取元素,然後使用GlobalExecutor.submitServiceUpdate提交ServiceUpdater
- ServiceUpdater實現了Runnable介面,其run方法執行的是updatedHealthStatus