聊聊nacos的ServiceReporter
阿新 • • 發佈:2019-12-31
序
本文主要研究一下nacos的ServiceReporter
ServiceManager.init
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/core/ServiceManager.java
@Component
@DependsOn("nacosApplicationContext")
public class ServiceManager implements RecordListener<Service> {
/**
* Map<namespace,Map<group::serviceName,Service>>
*/
private Map<String,Map<String,Service>> serviceMap = new ConcurrentHashMap<>();
private LinkedBlockingDeque<ServiceKey> toBeUpdatedServicesQueue = new LinkedBlockingDeque<>(1024 * 1024);
private Synchronizer synchronizer = new ServiceStatusSynchronizer();
private final Lock lock = new ReentrantLock();
@Resource(name = "consistencyDelegate" )
private ConsistencyService consistencyService;
@Autowired
private SwitchDomain switchDomain;
@Autowired
private DistroMapper distroMapper;
@Autowired
private ServerListManager serverListManager;
@Autowired
private PushService pushService;
private final Object putServiceLock = new Object();
@PostConstruct
public void init () {
UtilsAndCommons.SERVICE_SYNCHRONIZATION_EXECUTOR.schedule(new ServiceReporter(),60000,TimeUnit.MILLISECONDS);
UtilsAndCommons.SERVICE_UPDATE_EXECUTOR.submit(new UpdatedServiceProcessor());
try {
Loggers.SRV_LOG.info("listen for service meta change");
consistencyService.listen(KeyBuilder.SERVICE_META_KEY_PREFIX,this);
} catch (NacosException e) {
Loggers.SRV_LOG.error("listen for service meta change failed!" );
}
}
//......
}
複製程式碼
- ServiceManager的init方法往UtilsAndCommons.SERVICE_SYNCHRONIZATION_EXECUTOR註冊了ServiceReporter
ServiceReporter
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/core/ServiceManager.java
private class ServiceReporter implements Runnable {
@Override
public void run() {
try {
Map<String,Set<String>> allServiceNames = getAllServiceNames();
if (allServiceNames.size() <= 0) {
//ignore
return;
}
for (String namespaceId : allServiceNames.keySet()) {
ServiceChecksum checksum = new ServiceChecksum(namespaceId);
for (String serviceName : allServiceNames.get(namespaceId)) {
if (!distroMapper.responsible(serviceName)) {
continue;
}
Service service = getService(namespaceId,serviceName);
if (service == null) {
continue;
}
service.recalculateChecksum();
checksum.addItem(serviceName,service.getChecksum());
}
Message msg = new Message();
msg.setData(JSON.toJSONString(checksum));
List<Server> sameSiteServers = serverListManager.getServers();
if (sameSiteServers == null || sameSiteServers.size() <= 0) {
return;
}
for (Server server : sameSiteServers) {
if (server.getKey().equals(NetUtils.localServer())) {
continue;
}
synchronizer.send(server.getKey(),msg);
}
}
} catch (Exception e) {
Loggers.SRV_LOG.error("[DOMAIN-STATUS] Exception while sending service status",e);
} finally {
UtilsAndCommons.SERVICE_SYNCHRONIZATION_EXECUTOR.schedule(this,switchDomain.getServiceStatusSynchronizationPeriodMillis(),TimeUnit.MILLISECONDS);
}
}
}
複製程式碼
- ServiceReporter實現Runnable介面,其run方法會遍歷allServiceNames,取出distroMapper.responsible的serviceName,重新計算recalculateChecksum,然後新增到ServiceChecksum中,構造Message,遍歷sameSiteServers使用synchronizer.send傳送該訊息;最後往UtilsAndCommons.SERVICE_SYNCHRONIZATION_EXECUTOR重新註冊ServiceReporter
ServiceStatusSynchronizer
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/misc/ServiceStatusSynchronizer.java
public class ServiceStatusSynchronizer implements Synchronizer {
@Override
public void send(final String serverIP,Message msg) {
if(serverIP == null) {
return;
}
Map<String,String> params = new HashMap<String,String>(10);
params.put("statuses",msg.getData());
params.put("clientIP",NetUtils.localServer());
String url = "http://" + serverIP + ":" + RunningConfig.getServerPort() + RunningConfig.getContextPath() +
UtilsAndCommons.NACOS_NAMING_CONTEXT + "/service/status";
if (serverIP.contains(UtilsAndCommons.IP_PORT_SPLITER)) {
url = "http://" + serverIP + RunningConfig.getContextPath() +
UtilsAndCommons.NACOS_NAMING_CONTEXT + "/service/status";
}
try {
HttpClient.asyncHttpPostLarge(url,null,JSON.toJSONString(params),new AsyncCompletionHandler() {
@Override
public Integer onCompleted(Response response) throws Exception {
if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {
Loggers.SRV_LOG.warn("[STATUS-SYNCHRONIZE] failed to request serviceStatus,remote server: {}",serverIP);
return 1;
}
return 0;
}
});
} catch (Exception e) {
Loggers.SRV_LOG.warn("[STATUS-SYNCHRONIZE] failed to request serviceStatus,remote server: " + serverIP,e);
}
}
@Override
public Message get(String serverIP,String key) {
if(serverIP == null) {
return null;
}
Map<String,String> params = new HashMap<>(10);
params.put("key",key);
String result;
try {
if (Loggers.SRV_LOG.isDebugEnabled()) {
Loggers.SRV_LOG.debug("[STATUS-SYNCHRONIZE] sync service status from: {},service: {}",serverIP,key);
}
result = NamingProxy.reqAPI(RunningConfig.getContextPath()
+ UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance/" + "statuses",params,serverIP);
} catch (Exception e) {
Loggers.SRV_LOG.warn("[STATUS-SYNCHRONIZE] Failed to get service status from " + serverIP,e);
return null;
}
if(result == null || result.equals(StringUtils.EMPTY)) {
return null;
}
Message msg = new Message();
msg.setData(result);
return msg;
}
}
複製程式碼
- ServiceStatusSynchronizer實現了Synchronizer介面,其send方法會非同步執行post請求,將statuses通知到目標server
小結
ServiceReporter實現Runnable介面,其run方法會遍歷allServiceNames,取出distroMapper.responsible的serviceName,重新計算recalculateChecksum,然後新增到ServiceChecksum中,構造Message,遍歷sameSiteServers使用synchronizer.send傳送該訊息;最後往UtilsAndCommons.SERVICE_SYNCHRONIZATION_EXECUTOR重新註冊ServiceReporter