聊聊nacos的DistroConsistencyServiceImpl
阿新 • • 發佈:2019-12-31
序
本文主要研究一下nacos的DistroConsistencyServiceImpl
ConsistencyService
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/ConsistencyService.java
public interface ConsistencyService {
/**
* Put a data related to a key to Nacos cluster
*
* @param key key of data,this key should be globally unique
* @param value value of data
* @throws NacosException
* @see
*/
void put(String key,Record value) throws NacosException;
/**
* Remove a data from Nacos cluster
*
* @param key key of data
* @throws NacosException
*/
void remove(String key) throws NacosException;
/**
* Get a data from Nacos cluster
*
* @param key key of data
* @return data related to the key
* @throws NacosException
*/
Datum get(String key) throws NacosException;
/**
* Listen for changes of a data
*
* @param key key of data
* @param listener callback of data change
* @throws NacosException
*/
void listen(String key,RecordListener listener) throws NacosException;
/**
* Cancel listening of a data
*
* @param key key of data
* @param listener callback of data change
* @throws NacosException
*/
void unlisten(String key,RecordListener listener) throws NacosException;
/**
* Tell the status of this consistency service
*
* @return true if available
*/
boolean isAvailable();
}
複製程式碼
- ConsistencyService定義了put、remove、get、listen、unlisten、isAvailable方法
EphemeralConsistencyService
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/EphemeralConsistencyService.java
public interface EphemeralConsistencyService extends ConsistencyService {
}
複製程式碼
- EphemeralConsistencyService介面繼承了ConsistencyService介面
DistroConsistencyServiceImpl
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/DistroConsistencyServiceImpl.java
@org.springframework.stereotype.Service("distroConsistencyService")
public class DistroConsistencyServiceImpl implements EphemeralConsistencyService {
private ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(1,new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
t.setName("com.alibaba.nacos.naming.distro.notifier");
return t;
}
});
@Autowired
private DistroMapper distroMapper;
@Autowired
private DataStore dataStore;
@Autowired
private TaskDispatcher taskDispatcher;
@Autowired
private DataSyncer dataSyncer;
@Autowired
private Serializer serializer;
@Autowired
private ServerListManager serverListManager;
@Autowired
private SwitchDomain switchDomain;
@Autowired
private GlobalConfig globalConfig;
private boolean initialized = false;
public volatile Notifier notifier = new Notifier();
private Map<String,CopyOnWriteArrayList<RecordListener>> listeners = new ConcurrentHashMap<>();
private Map<String,String> syncChecksumTasks = new ConcurrentHashMap<>(16);
@PostConstruct
public void init() {
GlobalExecutor.submit(new Runnable() {
@Override
public void run() {
try {
load();
} catch (Exception e) {
Loggers.DISTRO.error("load data failed.",e);
}
}
});
executor.submit(notifier);
}
public void load() throws Exception {
if (SystemUtils.STANDALONE_MODE) {
initialized = true;
return;
}
// size = 1 means only myself in the list,we need at least one another server alive:
while (serverListManager.getHealthyServers().size() <= 1) {
Thread.sleep(1000L);
Loggers.DISTRO.info("waiting server list init...");
}
for (Server server : serverListManager.getHealthyServers()) {
if (NetUtils.localServer().equals(server.getKey())) {
continue;
}
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO.debug("sync from " + server);
}
// try sync data from remote server:
if (syncAllDataFromRemote(server)) {
initialized = true;
return;
}
}
}
//......
public boolean syncAllDataFromRemote(Server server) {
try {
byte[] data = NamingProxy.getAllData(server.getKey());
processData(data);
return true;
} catch (Exception e) {
Loggers.DISTRO.error("sync full data from " + server + " failed!",e);
return false;
}
}
public void processData(byte[] data) throws Exception {
if (data.length > 0) {
Map<String,Datum<Instances>> datumMap =
serializer.deserializeMap(data,Instances.class);
for (Map.Entry<String,Datum<Instances>> entry : datumMap.entrySet()) {
dataStore.put(entry.getKey(),entry.getValue());
if (!listeners.containsKey(entry.getKey())) {
// pretty sure the service not exist:
if (switchDomain.isDefaultInstanceEphemeral()) {
// create empty service
Loggers.DISTRO.info("creating service {}",entry.getKey());
Service service = new Service();
String serviceName = KeyBuilder.getServiceName(entry.getKey());
String namespaceId = KeyBuilder.getNamespace(entry.getKey());
service.setName(serviceName);
service.setNamespaceId(namespaceId);
service.setGroupName(Constants.DEFAULT_GROUP);
// now validate the service. if failed,exception will be thrown
service.setLastModifiedMillis(System.currentTimeMillis());
service.recalculateChecksum();
listeners.get(KeyBuilder.SERVICE_META_KEY_PREFIX).get(0)
.onChange(KeyBuilder.buildServiceMetaKey(namespaceId,serviceName),service);
}
}
}
for (Map.Entry<String,Datum<Instances>> entry : datumMap.entrySet()) {
if (!listeners.containsKey(entry.getKey())) {
// Should not happen:
Loggers.DISTRO.warn("listener of {} not found.",entry.getKey());
continue;
}
try {
for (RecordListener listener : listeners.get(entry.getKey())) {
listener.onChange(entry.getKey(),entry.getValue().value);
}
} catch (Exception e) {
Loggers.DISTRO.error("[NACOS-DISTRO] error while execute listener of key: {}",entry.getKey(),e);
continue;
}
// Update data store if listener executed successfully:
dataStore.put(entry.getKey(),entry.getValue());
}
}
}
//......
@Override
public void put(String key,Record value) throws NacosException {
onPut(key,value);
taskDispatcher.addTask(key);
}
@Override
public void remove(String key) throws NacosException {
onRemove(key);
listeners.remove(key);
}
@Override
public Datum get(String key) throws NacosException {
return dataStore.get(key);
}
//......
@Override
public void listen(String key,RecordListener listener) throws NacosException {
if (!listeners.containsKey(key)) {
listeners.put(key,new CopyOnWriteArrayList<>());
}
if (listeners.get(key).contains(listener)) {
return;
}
listeners.get(key).add(listener);
}
@Override
public void unlisten(String key,RecordListener listener) throws NacosException {
if (!listeners.containsKey(key)) {
return;
}
for (RecordListener recordListener : listeners.get(key)) {
if (recordListener.equals(listener)) {
listeners.get(key).remove(listener);
break;
}
}
}
@Override
public boolean isAvailable() {
return isInitialized() || ServerStatus.UP.name().equals(switchDomain.getOverriddenServerStatus());
}
//......
}
複製程式碼
- DistroConsistencyServiceImpl實現了EphemeralConsistencyService介面
- 其init方法會非同步執行load方法,該方法會執行syncAllDataFromRemote進行初始化,該方法會通過NamingProxy.getAllData獲取data,然後執行processData,它主要是執行回撥然後往dataStore新增資料;init方法最後會非同步執行Notifier
- 其put方法會執行onPut方法及taskDispatcher.addTask(key);其remove方法會執行onRemove方法即listeners.remove(key);其get方法直接從dataStore讀取;其listen會新增RecordListener;其unlisten則會移除RecordListener;其isAvailable會通過isInitialized及ServerStatus.UP狀態來判斷
Notifier
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/DistroConsistencyServiceImpl.java
public class Notifier implements Runnable {
private ConcurrentHashMap<String,String> services = new ConcurrentHashMap<>(10 * 1024);
private BlockingQueue<Pair> tasks = new LinkedBlockingQueue<Pair>(1024 * 1024);
public void addTask(String datumKey,ApplyAction action) {
if (services.containsKey(datumKey) && action == ApplyAction.CHANGE) {
return;
}
if (action == ApplyAction.CHANGE) {
services.put(datumKey,StringUtils.EMPTY);
}
tasks.add(Pair.with(datumKey,action));
}
public int getTaskSize() {
return tasks.size();
}
@Override
public void run() {
Loggers.DISTRO.info("distro notifier started");
while (true) {
try {
Pair pair = tasks.take();
if (pair == null) {
continue;
}
String datumKey = (String) pair.getValue0();
ApplyAction action = (ApplyAction) pair.getValue1();
services.remove(datumKey);
int count = 0;
if (!listeners.containsKey(datumKey)) {
continue;
}
for (RecordListener listener : listeners.get(datumKey)) {
count++;
try {
if (action == ApplyAction.CHANGE) {
listener.onChange(datumKey,dataStore.get(datumKey).value);
continue;
}
if (action == ApplyAction.DELETE) {
listener.onDelete(datumKey);
continue;
}
} catch (Throwable e) {
Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}",datumKey,e);
}
}
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO.debug("[NACOS-DISTRO] datum change notified,key: {},listener count: {},action: {}",count,action.name());
}
} catch (Throwable e) {
Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task",e);
}
}
}
}
複製程式碼
- Notifier實現了Runnable介面,其run方法會從LinkedBlockingQueue取task,然後挨個執行listener回撥
小結
- DistroConsistencyServiceImpl實現了EphemeralConsistencyService介面
- 其init方法會非同步執行load方法,該方法會執行syncAllDataFromRemote進行初始化,該方法會通過NamingProxy.getAllData獲取data,然後執行processData,它主要是執行回撥然後往dataStore新增資料;init方法最後會非同步執行Notifier
- 其put方法會執行onPut方法及taskDispatcher.addTask(key);其remove方法會執行onRemove方法即listeners.remove(key);其get方法直接從dataStore讀取;其listen會新增RecordListener;其unlisten則會移除RecordListener;其isAvailable會通過isInitialized及ServerStatus.UP狀態來判斷