1. 程式人生 > 程式設計 >聊聊nacos server的PushService

聊聊nacos server的PushService

本文主要研究一下nacos server的PushService

PushService

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/push/PushService.java

@Component
public class PushService implements ApplicationContextAware,ApplicationListener<ServiceChangeEvent> {

    @Autowired
    private SwitchDomain switchDomain;

    private ApplicationContext applicationContext;

    private static final long ACK_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(10L);

    private static final int MAX_RETRY_TIMES = 1;

    private static volatile ConcurrentMap<String,Receiver.AckEntry> ackMap
        = new ConcurrentHashMap<String,Receiver.AckEntry>();

    private static ConcurrentMap<String,ConcurrentMap<String,PushClient>> clientMap
        = new ConcurrentHashMap<String,PushClient>>();

    private static volatile ConcurrentHashMap<String,Long> udpSendTimeMap = new ConcurrentHashMap<String,Long>();

    public static volatile ConcurrentHashMap<String,Long> pushCostMap = new ConcurrentHashMap<String,Long>();

    private static int totalPush = 0;

    private static int failedPush = 0;

    private static ConcurrentHashMap<String,Long> lastPushMillisMap = new ConcurrentHashMap<>();

    private static DatagramSocket udpSocket;

    private static Map<String,Future> futureMap = new ConcurrentHashMap<>();
    private static ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory
() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setDaemon(true); t.setName("com.alibaba.nacos.naming.push.retransmitter"); return t; } }); private static ScheduledExecutorService udpSender = Executors.newSingleThreadScheduledExecutor(new ThreadFactory
() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setDaemon(true); t.setName("com.alibaba.nacos.naming.push.udpSender"); return t; } }); static { try { udpSocket = new DatagramSocket(); Receiver receiver = new Receiver(); Thread in
Thread = new Thread(receiver); inThread.setDaemon(true); inThread.setName("com.alibaba.nacos.naming.push.receiver"); inThread.start(); executorService.scheduleWithFixedDelay(new Runnable() { @Override public void run() { try { removeClientIfZombie(); } catch (Throwable e) { Loggers.PUSH.warn("[NACOS-PUSH] failed to remove client zombie"); } } },20,TimeUnit.SECONDS); } catch (SocketException e) { Loggers.SRV_LOG.error("[NACOS-PUSH] failed to init push service"); } } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; } //...... public static void removeClientIfZombie() { int size = 0; for (Map.Entry<String,PushClient>> entry : clientMap.entrySet()) { ConcurrentMap<String,PushClient> clientConcurrentMap = entry.getValue(); for (Map.Entry<String,PushClient> entry1 : clientConcurrentMap.entrySet()) { PushClient client = entry1.getValue(); if (client.zombie()) { clientConcurrentMap.remove(entry1.getKey()); } } size += clientConcurrentMap.size(); } if (Loggers.PUSH.isDebugEnabled()) { Loggers.PUSH.debug("[NACOS-PUSH] clientMap size: {}",size); } } //...... } 複製程式碼
  • PushService實現了ApplicationContextAware、ApplicationListener<ServiceChangeEvent>介面;它有兩個ScheduledExecutorService,一個用於retransmitter,一個用於udpSender;其static程式碼塊建立了一個deamon執行緒執行Receiver,同時註冊了一個定時任務執行removeClientIfZombie,它會遍歷clientMap,移除zombie的client

Receiver

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/push/PushService.java

    public static class Receiver implements Runnable {
        @Override
        public void run() {
            while (true) {
                byte[] buffer = new byte[1024 * 64];
                DatagramPacket packet = new DatagramPacket(buffer,buffer.length);

                try {
                    udpSocket.receive(packet);

                    String json = new String(packet.getData(),packet.getLength(),Charset.forName("UTF-8")).trim();
                    AckPacket ackPacket = JSON.parseObject(json,AckPacket.class);

                    InetSocketAddress socketAddress = (InetSocketAddress) packet.getSocketAddress();
                    String ip = socketAddress.getAddress().getHostAddress();
                    int port = socketAddress.getPort();

                    if (System.nanoTime() - ackPacket.lastRefTime > ACK_TIMEOUT_NANOS) {
                        Loggers.PUSH.warn("ack takes too long from {} ack json: {}",packet.getSocketAddress(),json);
                    }

                    String ackKey = getACKKey(ip,port,ackPacket.lastRefTime);
                    AckEntry ackEntry = ackMap.remove(ackKey);
                    if (ackEntry == null) {
                        throw new IllegalStateException("unable to find ackEntry for key: " + ackKey
                            + ",ack json: " + json);
                    }

                    long pushCost = System.currentTimeMillis() - udpSendTimeMap.get(ackKey);

                    Loggers.PUSH.info("received ack: {} from: {}:,cost: {} ms,unacked: {},total push: {}",json,ip,pushCost,ackMap.size(),totalPush);

                    pushCostMap.put(ackKey,pushCost);

                    udpSendTimeMap.remove(ackKey);

                } catch (Throwable e) {
                    Loggers.PUSH.error("[NACOS-PUSH] error while receiving ack data",e);
                }
            }
        }

        //......

        public static class AckPacket {
            public String type;
            public long lastRefTime;

            public String data;
        }
    }
複製程式碼
  • Receiver實現了Runnable介面,其run方法使用while true迴圈來執行udpSocket.receive,之後解析AckPacket,從ackMap移除該ackKey,更新pushCostMap,同時從udpSendTimeMap移除該ackKey

PushClient

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/push/PushService.java

    public class PushClient {
        private String namespaceId;
        private String serviceName;
        private String clusters;
        private String agent;
        private String tenant;
        private String app;
        private InetSocketAddress socketAddr;
        private DataSource dataSource;
        private Map<String,String[]> params;

        public Map<String,String[]> getParams() {
            return params;
        }

        public void setParams(Map<String,String[]> params) {
            this.params = params;
        }

        public long lastRefTime = System.currentTimeMillis();

        public PushClient(String namespaceId,String serviceName,String clusters,String agent,InetSocketAddress socketAddr,DataSource dataSource,String tenant,String app) {
            this.namespaceId = namespaceId;
            this.serviceName = serviceName;
            this.clusters = clusters;
            this.agent = agent;
            this.socketAddr = socketAddr;
            this.dataSource = dataSource;
            this.tenant = tenant;
            this.app = app;
        }

        public DataSource getDataSource() {
            return dataSource;
        }

        public PushClient(InetSocketAddress socketAddr) {
            this.socketAddr = socketAddr;
        }

        public boolean zombie() {
            return System.currentTimeMillis() - lastRefTime > switchDomain.getPushCacheMillis(serviceName);
        }

        @Override
        public String toString() {
            return "serviceName: " + serviceName
                + ",clusters: " + clusters
                + ",ip: " + socketAddr.getAddress().getHostAddress()
                + ",port: " + socketAddr.getPort()
                + ",agent: " + agent;
        }

        public String getAgent() {
            return agent;
        }

        public String getAddrStr() {
            return socketAddr.getAddress().getHostAddress() + ":" + socketAddr.getPort();
        }

        public String getIp() {
            return socketAddr.getAddress().getHostAddress();
        }

        @Override
        public int hashCode() {
            return Objects.hash(serviceName,clusters,socketAddr);
        }

        @Override
        public boolean equals(Object obj) {
            if (!(obj instanceof PushClient)) {
                return false;
            }

            PushClient other = (PushClient) obj;

            return serviceName.equals(other.serviceName) && clusters.equals(other.clusters) && socketAddr.equals(other.socketAddr);
        }

        public String getClusters() {
            return clusters;
        }

        public void setClusters(String clusters) {
            this.clusters = clusters;
        }

        public String getNamespaceId() {
            return namespaceId;
        }

        public void setNamespaceId(String namespaceId) {
            this.namespaceId = namespaceId;
        }

        public String getServiceName() {
            return serviceName;
        }

        public void setServiceName(String serviceName) {
            this.serviceName = serviceName;
        }

        public String getTenant() {
            return tenant;
        }

        public void setTenant(String tenant) {
            this.tenant = tenant;
        }

        public String getApp() {
            return app;
        }

        public void setApp(String app) {
            this.app = app;
        }

        public InetSocketAddress getSocketAddr() {
            return socketAddr;
        }

        public void refresh() {
            lastRefTime = System.currentTimeMillis();
        }
    }
複製程式碼
  • PushClient封裝了要推送的目標服務地址等資訊,它提供了zombie方法來判斷目標服務是否zombie,它判斷距離lastRefTime的時間差是否超過switchDomain指定的該serviceName的PushCacheMillis(預設為10秒),超過則判定為zombie

PushService.onApplicationEvent

@Component
public class PushService implements ApplicationContextAware,ApplicationListener<ServiceChangeEvent> {

	//......

    @Override
    public void onApplicationEvent(ServiceChangeEvent event) {
        Service service = event.getService();
        String serviceName = service.getName();
        String namespaceId = service.getNamespaceId();

        Future future = udpSender.schedule(new Runnable() {
            @Override
            public void run() {
                try {
                    Loggers.PUSH.info(serviceName + " is changed,add it to push queue.");
                    ConcurrentMap<String,PushClient> clients = clientMap.get(UtilsAndCommons.assembleFullServiceName(namespaceId,serviceName));
                    if (MapUtils.isEmpty(clients)) {
                        return;
                    }

                    Map<String,Object> cache = new HashMap<>(16);
                    long lastRefTime = System.nanoTime();
                    for (PushClient client : clients.values()) {
                        if (client.zombie()) {
                            Loggers.PUSH.debug("client is zombie: " + client.toString());
                            clients.remove(client.toString());
                            Loggers.PUSH.debug("client is zombie: " + client.toString());
                            continue;
                        }

                        Receiver.AckEntry ackEntry;
                        Loggers.PUSH.debug("push serviceName: {} to client: {}",serviceName,client.toString());
                        String key = getPushCacheKey(serviceName,client.getIp(),client.getAgent());
                        byte[] compressData = null;
                        Map<String,Object> data = null;
                        if (switchDomain.getDefaultPushCacheMillis() >= 20000 && cache.containsKey(key)) {
                            org.javatuples.Pair pair = (org.javatuples.Pair) cache.get(key);
                            compressData = (byte[]) (pair.getValue0());
                            data = (Map<String,Object>) pair.getValue1();

                            Loggers.PUSH.debug("[PUSH-CACHE] cache hit: {}:{}",client.getAddrStr());
                        }

                        if (compressData != null) {
                            ackEntry = prepareAckEntry(client,compressData,data,lastRefTime);
                        } else {
                            ackEntry = prepareAckEntry(client,prepareHostsData(client),lastRefTime);
                            if (ackEntry != null) {
                                cache.put(key,new org.javatuples.Pair<>(ackEntry.origin.getData(),ackEntry.data));
                            }
                        }

                        Loggers.PUSH.info("serviceName: {} changed,schedule push for: {},agent: {},key: {}",client.getServiceName(),client.getAddrStr(),client.getAgent(),(ackEntry == null ? null : ackEntry.key));

                        udpPush(ackEntry);
                    }
                } catch (Exception e) {
                    Loggers.PUSH.error("[NACOS-PUSH] failed to push serviceName: {} to client,error: {}",e);

                } finally {
                    futureMap.remove(UtilsAndCommons.assembleFullServiceName(namespaceId,serviceName));
                }

            }
        },1000,TimeUnit.MILLISECONDS);

        futureMap.put(UtilsAndCommons.assembleFullServiceName(namespaceId,serviceName),future);

    }

    //......

    public void serviceChanged(Service service) {
        // merge some change events to reduce the push frequency:
        if (futureMap.containsKey(UtilsAndCommons.assembleFullServiceName(service.getNamespaceId(),service.getName()))) {
            return;
        }

        this.applicationContext.publishEvent(new ServiceChangeEvent(this,service));
    }

    //......    
}
複製程式碼
  • onApplicationEvent會處理ServiceChangeEvent,它會註冊一個延時任務並將該future放入futureMap;該延時任務會從clientMap獲取指定namespaceId,serviceName的clients;然後遍歷clients判斷是否是zombie,如果是的話則移除該client,否則建立Receiver.AckEntry,然後執行udpPush(ackEntry),最後從futureMap移除該future;serviceChanged方法提供給外部呼叫釋出ServiceChangeEvent

PushService.udpPush

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/push/PushService.java

@Component
public class PushService implements ApplicationContextAware,ApplicationListener<ServiceChangeEvent> {

	//......

    public static class Receiver implements Runnable {
    	//......

        public static class AckEntry {

            public AckEntry(String key,DatagramPacket packet) {
                this.key = key;
                this.origin = packet;
            }

            public void increaseRetryTime() {
                retryTimes.incrementAndGet();
            }

            public int getRetryTimes() {
                return retryTimes.get();
            }

            public String key;
            public DatagramPacket origin;
            private AtomicInteger retryTimes = new AtomicInteger(0);
            public Map<String,Object> data;
        }

    	//......
    }	

    private static Receiver.AckEntry udpPush(Receiver.AckEntry ackEntry) {
        if (ackEntry == null) {
            Loggers.PUSH.error("[NACOS-PUSH] ackEntry is null.");
            return null;
        }

        if (ackEntry.getRetryTimes() > MAX_RETRY_TIMES) {
            Loggers.PUSH.warn("max re-push times reached,retry times {},ackEntry.retryTimes,ackEntry.key);
            ackMap.remove(ackEntry.key);
            udpSendTimeMap.remove(ackEntry.key);
            failedPush += 1;
            return ackEntry;
        }

        try {
            if (!ackMap.containsKey(ackEntry.key)) {
                totalPush++;
            }
            ackMap.put(ackEntry.key,ackEntry);
            udpSendTimeMap.put(ackEntry.key,System.currentTimeMillis());

            Loggers.PUSH.info("send udp packet: " + ackEntry.key);
            udpSocket.send(ackEntry.origin);

            ackEntry.increaseRetryTime();

            executorService.schedule(new Retransmitter(ackEntry),TimeUnit.NANOSECONDS.toMillis(ACK_TIMEOUT_NANOS),TimeUnit.MILLISECONDS);

            return ackEntry;
        } catch (Exception e) {
            Loggers.PUSH.error("[NACOS-PUSH] failed to push data: {} to client: {},ackEntry.data,ackEntry.origin.getAddress().getHostAddress(),e);
            ackMap.remove(ackEntry.key);
            udpSendTimeMap.remove(ackEntry.key);
            failedPush += 1;

            return null;
        }
    }

	//......
}
複製程式碼
  • udpPush方法會根據Receiver.AckEntry的資訊進行判斷,如果其重試次數大於MAX_RETRY_TIMES則終止push,將其從ackMap、udpSendTimeMap中移除;如果可以重試則將其ackEntry.key放入ackMap及udpSendTimeMap,然後執行udpSocket.send(ackEntry.origin)及ackEntry.increaseRetryTime(),並註冊Retransmitter的延時任務;如果出現異常則將其從ackMap、udpSendTimeMap移除

Retransmitter

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/push/PushService.java

    public static class Retransmitter implements Runnable {
        Receiver.AckEntry ackEntry;

        public Retransmitter(Receiver.AckEntry ackEntry) {
            this.ackEntry = ackEntry;
        }

        @Override
        public void run() {
            if (ackMap.containsKey(ackEntry.key)) {
                Loggers.PUSH.info("retry to push data,key: " + ackEntry.key);
                udpPush(ackEntry);
            }
        }
    }
複製程式碼
  • Retransmitter實現了Runnable方法,其run方法在ackMap包含ackEntry.key的條件下執行udpPush重試

小結

  • PushService實現了ApplicationContextAware、ApplicationListener<ServiceChangeEvent>介面
  • 其static程式碼塊建立了一個deamon執行緒執行Receiver,同時註冊了一個定時任務執行removeClientIfZombie,它會遍歷clientMap,移除zombie的client
  • 其onApplicationEvent會處理ServiceChangeEvent,它會註冊一個延時任務並將該future放入futureMap;該延時任務會從clientMap獲取指定namespaceId,serviceName的clients;然後遍歷clients判斷是否是zombie,如果是的話則移除該client,否則建立Receiver.AckEntry,然後執行udpPush(ackEntry),最後從futureMap移除該future;serviceChanged方法提供給外部呼叫釋出ServiceChangeEvent

doc