聊聊nacos server的PushService
阿新 • • 發佈:2019-12-31
序
本文主要研究一下nacos server的PushService
PushService
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/push/PushService.java
@Component
public class PushService implements ApplicationContextAware,ApplicationListener<ServiceChangeEvent> {
@Autowired
private SwitchDomain switchDomain;
private ApplicationContext applicationContext;
private static final long ACK_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(10L);
private static final int MAX_RETRY_TIMES = 1;
private static volatile ConcurrentMap<String,Receiver.AckEntry> ackMap
= new ConcurrentHashMap<String,Receiver.AckEntry>();
private static ConcurrentMap<String,ConcurrentMap<String,PushClient>> clientMap
= new ConcurrentHashMap<String,PushClient>>();
private static volatile ConcurrentHashMap<String,Long> udpSendTimeMap = new ConcurrentHashMap<String,Long>();
public static volatile ConcurrentHashMap<String,Long> pushCostMap = new ConcurrentHashMap<String,Long>();
private static int totalPush = 0;
private static int failedPush = 0;
private static ConcurrentHashMap<String,Long> lastPushMillisMap = new ConcurrentHashMap<>();
private static DatagramSocket udpSocket;
private static Map<String,Future> futureMap = new ConcurrentHashMap<>();
private static ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory () {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
t.setName("com.alibaba.nacos.naming.push.retransmitter");
return t;
}
});
private static ScheduledExecutorService udpSender = Executors.newSingleThreadScheduledExecutor(new ThreadFactory () {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
t.setName("com.alibaba.nacos.naming.push.udpSender");
return t;
}
});
static {
try {
udpSocket = new DatagramSocket();
Receiver receiver = new Receiver();
Thread in Thread = new Thread(receiver);
inThread.setDaemon(true);
inThread.setName("com.alibaba.nacos.naming.push.receiver");
inThread.start();
executorService.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
try {
removeClientIfZombie();
} catch (Throwable e) {
Loggers.PUSH.warn("[NACOS-PUSH] failed to remove client zombie");
}
}
},20,TimeUnit.SECONDS);
} catch (SocketException e) {
Loggers.SRV_LOG.error("[NACOS-PUSH] failed to init push service");
}
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
//......
public static void removeClientIfZombie() {
int size = 0;
for (Map.Entry<String,PushClient>> entry : clientMap.entrySet()) {
ConcurrentMap<String,PushClient> clientConcurrentMap = entry.getValue();
for (Map.Entry<String,PushClient> entry1 : clientConcurrentMap.entrySet()) {
PushClient client = entry1.getValue();
if (client.zombie()) {
clientConcurrentMap.remove(entry1.getKey());
}
}
size += clientConcurrentMap.size();
}
if (Loggers.PUSH.isDebugEnabled()) {
Loggers.PUSH.debug("[NACOS-PUSH] clientMap size: {}",size);
}
}
//......
}
複製程式碼
- PushService實現了ApplicationContextAware、ApplicationListener<ServiceChangeEvent>介面;它有兩個ScheduledExecutorService,一個用於retransmitter,一個用於udpSender;其static程式碼塊建立了一個deamon執行緒執行Receiver,同時註冊了一個定時任務執行removeClientIfZombie,它會遍歷clientMap,移除zombie的client
Receiver
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/push/PushService.java
public static class Receiver implements Runnable {
@Override
public void run() {
while (true) {
byte[] buffer = new byte[1024 * 64];
DatagramPacket packet = new DatagramPacket(buffer,buffer.length);
try {
udpSocket.receive(packet);
String json = new String(packet.getData(),packet.getLength(),Charset.forName("UTF-8")).trim();
AckPacket ackPacket = JSON.parseObject(json,AckPacket.class);
InetSocketAddress socketAddress = (InetSocketAddress) packet.getSocketAddress();
String ip = socketAddress.getAddress().getHostAddress();
int port = socketAddress.getPort();
if (System.nanoTime() - ackPacket.lastRefTime > ACK_TIMEOUT_NANOS) {
Loggers.PUSH.warn("ack takes too long from {} ack json: {}",packet.getSocketAddress(),json);
}
String ackKey = getACKKey(ip,port,ackPacket.lastRefTime);
AckEntry ackEntry = ackMap.remove(ackKey);
if (ackEntry == null) {
throw new IllegalStateException("unable to find ackEntry for key: " + ackKey
+ ",ack json: " + json);
}
long pushCost = System.currentTimeMillis() - udpSendTimeMap.get(ackKey);
Loggers.PUSH.info("received ack: {} from: {}:,cost: {} ms,unacked: {},total push: {}",json,ip,pushCost,ackMap.size(),totalPush);
pushCostMap.put(ackKey,pushCost);
udpSendTimeMap.remove(ackKey);
} catch (Throwable e) {
Loggers.PUSH.error("[NACOS-PUSH] error while receiving ack data",e);
}
}
}
//......
public static class AckPacket {
public String type;
public long lastRefTime;
public String data;
}
}
複製程式碼
- Receiver實現了Runnable介面,其run方法使用while true迴圈來執行udpSocket.receive,之後解析AckPacket,從ackMap移除該ackKey,更新pushCostMap,同時從udpSendTimeMap移除該ackKey
PushClient
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/push/PushService.java
public class PushClient {
private String namespaceId;
private String serviceName;
private String clusters;
private String agent;
private String tenant;
private String app;
private InetSocketAddress socketAddr;
private DataSource dataSource;
private Map<String,String[]> params;
public Map<String,String[]> getParams() {
return params;
}
public void setParams(Map<String,String[]> params) {
this.params = params;
}
public long lastRefTime = System.currentTimeMillis();
public PushClient(String namespaceId,String serviceName,String clusters,String agent,InetSocketAddress socketAddr,DataSource dataSource,String tenant,String app) {
this.namespaceId = namespaceId;
this.serviceName = serviceName;
this.clusters = clusters;
this.agent = agent;
this.socketAddr = socketAddr;
this.dataSource = dataSource;
this.tenant = tenant;
this.app = app;
}
public DataSource getDataSource() {
return dataSource;
}
public PushClient(InetSocketAddress socketAddr) {
this.socketAddr = socketAddr;
}
public boolean zombie() {
return System.currentTimeMillis() - lastRefTime > switchDomain.getPushCacheMillis(serviceName);
}
@Override
public String toString() {
return "serviceName: " + serviceName
+ ",clusters: " + clusters
+ ",ip: " + socketAddr.getAddress().getHostAddress()
+ ",port: " + socketAddr.getPort()
+ ",agent: " + agent;
}
public String getAgent() {
return agent;
}
public String getAddrStr() {
return socketAddr.getAddress().getHostAddress() + ":" + socketAddr.getPort();
}
public String getIp() {
return socketAddr.getAddress().getHostAddress();
}
@Override
public int hashCode() {
return Objects.hash(serviceName,clusters,socketAddr);
}
@Override
public boolean equals(Object obj) {
if (!(obj instanceof PushClient)) {
return false;
}
PushClient other = (PushClient) obj;
return serviceName.equals(other.serviceName) && clusters.equals(other.clusters) && socketAddr.equals(other.socketAddr);
}
public String getClusters() {
return clusters;
}
public void setClusters(String clusters) {
this.clusters = clusters;
}
public String getNamespaceId() {
return namespaceId;
}
public void setNamespaceId(String namespaceId) {
this.namespaceId = namespaceId;
}
public String getServiceName() {
return serviceName;
}
public void setServiceName(String serviceName) {
this.serviceName = serviceName;
}
public String getTenant() {
return tenant;
}
public void setTenant(String tenant) {
this.tenant = tenant;
}
public String getApp() {
return app;
}
public void setApp(String app) {
this.app = app;
}
public InetSocketAddress getSocketAddr() {
return socketAddr;
}
public void refresh() {
lastRefTime = System.currentTimeMillis();
}
}
複製程式碼
- PushClient封裝了要推送的目標服務地址等資訊,它提供了zombie方法來判斷目標服務是否zombie,它判斷距離lastRefTime的時間差是否超過switchDomain指定的該serviceName的PushCacheMillis(
預設為10秒
),超過則判定為zombie
PushService.onApplicationEvent
@Component
public class PushService implements ApplicationContextAware,ApplicationListener<ServiceChangeEvent> {
//......
@Override
public void onApplicationEvent(ServiceChangeEvent event) {
Service service = event.getService();
String serviceName = service.getName();
String namespaceId = service.getNamespaceId();
Future future = udpSender.schedule(new Runnable() {
@Override
public void run() {
try {
Loggers.PUSH.info(serviceName + " is changed,add it to push queue.");
ConcurrentMap<String,PushClient> clients = clientMap.get(UtilsAndCommons.assembleFullServiceName(namespaceId,serviceName));
if (MapUtils.isEmpty(clients)) {
return;
}
Map<String,Object> cache = new HashMap<>(16);
long lastRefTime = System.nanoTime();
for (PushClient client : clients.values()) {
if (client.zombie()) {
Loggers.PUSH.debug("client is zombie: " + client.toString());
clients.remove(client.toString());
Loggers.PUSH.debug("client is zombie: " + client.toString());
continue;
}
Receiver.AckEntry ackEntry;
Loggers.PUSH.debug("push serviceName: {} to client: {}",serviceName,client.toString());
String key = getPushCacheKey(serviceName,client.getIp(),client.getAgent());
byte[] compressData = null;
Map<String,Object> data = null;
if (switchDomain.getDefaultPushCacheMillis() >= 20000 && cache.containsKey(key)) {
org.javatuples.Pair pair = (org.javatuples.Pair) cache.get(key);
compressData = (byte[]) (pair.getValue0());
data = (Map<String,Object>) pair.getValue1();
Loggers.PUSH.debug("[PUSH-CACHE] cache hit: {}:{}",client.getAddrStr());
}
if (compressData != null) {
ackEntry = prepareAckEntry(client,compressData,data,lastRefTime);
} else {
ackEntry = prepareAckEntry(client,prepareHostsData(client),lastRefTime);
if (ackEntry != null) {
cache.put(key,new org.javatuples.Pair<>(ackEntry.origin.getData(),ackEntry.data));
}
}
Loggers.PUSH.info("serviceName: {} changed,schedule push for: {},agent: {},key: {}",client.getServiceName(),client.getAddrStr(),client.getAgent(),(ackEntry == null ? null : ackEntry.key));
udpPush(ackEntry);
}
} catch (Exception e) {
Loggers.PUSH.error("[NACOS-PUSH] failed to push serviceName: {} to client,error: {}",e);
} finally {
futureMap.remove(UtilsAndCommons.assembleFullServiceName(namespaceId,serviceName));
}
}
},1000,TimeUnit.MILLISECONDS);
futureMap.put(UtilsAndCommons.assembleFullServiceName(namespaceId,serviceName),future);
}
//......
public void serviceChanged(Service service) {
// merge some change events to reduce the push frequency:
if (futureMap.containsKey(UtilsAndCommons.assembleFullServiceName(service.getNamespaceId(),service.getName()))) {
return;
}
this.applicationContext.publishEvent(new ServiceChangeEvent(this,service));
}
//......
}
複製程式碼
- onApplicationEvent會處理ServiceChangeEvent,它會註冊一個延時任務並將該future放入futureMap;該延時任務會從clientMap獲取指定namespaceId,serviceName的clients;然後遍歷clients判斷是否是zombie,如果是的話則移除該client,否則建立Receiver.AckEntry,然後執行udpPush(ackEntry),最後從futureMap移除該future;serviceChanged方法提供給外部呼叫釋出ServiceChangeEvent
PushService.udpPush
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/push/PushService.java
@Component
public class PushService implements ApplicationContextAware,ApplicationListener<ServiceChangeEvent> {
//......
public static class Receiver implements Runnable {
//......
public static class AckEntry {
public AckEntry(String key,DatagramPacket packet) {
this.key = key;
this.origin = packet;
}
public void increaseRetryTime() {
retryTimes.incrementAndGet();
}
public int getRetryTimes() {
return retryTimes.get();
}
public String key;
public DatagramPacket origin;
private AtomicInteger retryTimes = new AtomicInteger(0);
public Map<String,Object> data;
}
//......
}
private static Receiver.AckEntry udpPush(Receiver.AckEntry ackEntry) {
if (ackEntry == null) {
Loggers.PUSH.error("[NACOS-PUSH] ackEntry is null.");
return null;
}
if (ackEntry.getRetryTimes() > MAX_RETRY_TIMES) {
Loggers.PUSH.warn("max re-push times reached,retry times {},ackEntry.retryTimes,ackEntry.key);
ackMap.remove(ackEntry.key);
udpSendTimeMap.remove(ackEntry.key);
failedPush += 1;
return ackEntry;
}
try {
if (!ackMap.containsKey(ackEntry.key)) {
totalPush++;
}
ackMap.put(ackEntry.key,ackEntry);
udpSendTimeMap.put(ackEntry.key,System.currentTimeMillis());
Loggers.PUSH.info("send udp packet: " + ackEntry.key);
udpSocket.send(ackEntry.origin);
ackEntry.increaseRetryTime();
executorService.schedule(new Retransmitter(ackEntry),TimeUnit.NANOSECONDS.toMillis(ACK_TIMEOUT_NANOS),TimeUnit.MILLISECONDS);
return ackEntry;
} catch (Exception e) {
Loggers.PUSH.error("[NACOS-PUSH] failed to push data: {} to client: {},ackEntry.data,ackEntry.origin.getAddress().getHostAddress(),e);
ackMap.remove(ackEntry.key);
udpSendTimeMap.remove(ackEntry.key);
failedPush += 1;
return null;
}
}
//......
}
複製程式碼
- udpPush方法會根據Receiver.AckEntry的資訊進行判斷,如果其重試次數大於MAX_RETRY_TIMES則終止push,將其從ackMap、udpSendTimeMap中移除;如果可以重試則將其ackEntry.key放入ackMap及udpSendTimeMap,然後執行udpSocket.send(ackEntry.origin)及ackEntry.increaseRetryTime(),並註冊Retransmitter的延時任務;如果出現異常則將其從ackMap、udpSendTimeMap移除
Retransmitter
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/push/PushService.java
public static class Retransmitter implements Runnable {
Receiver.AckEntry ackEntry;
public Retransmitter(Receiver.AckEntry ackEntry) {
this.ackEntry = ackEntry;
}
@Override
public void run() {
if (ackMap.containsKey(ackEntry.key)) {
Loggers.PUSH.info("retry to push data,key: " + ackEntry.key);
udpPush(ackEntry);
}
}
}
複製程式碼
- Retransmitter實現了Runnable方法,其run方法在ackMap包含ackEntry.key的條件下執行udpPush重試
小結
- PushService實現了ApplicationContextAware、ApplicationListener<ServiceChangeEvent>介面
- 其static程式碼塊建立了一個deamon執行緒執行Receiver,同時註冊了一個定時任務執行removeClientIfZombie,它會遍歷clientMap,移除zombie的client
- 其onApplicationEvent會處理ServiceChangeEvent,它會註冊一個延時任務並將該future放入futureMap;該延時任務會從clientMap獲取指定namespaceId,serviceName的clients;然後遍歷clients判斷是否是zombie,如果是的話則移除該client,否則建立Receiver.AckEntry,然後執行udpPush(ackEntry),最後從futureMap移除該future;serviceChanged方法提供給外部呼叫釋出ServiceChangeEvent