1. 程式人生 > 程式設計 >聊聊nacos NamingProxy的getServiceList

聊聊nacos NamingProxy的getServiceList

本文主要研究一下nacos NamingProxy的getServiceList

NamingProxy.initRefreshSrvIfNeed

nacos-1.1.3/client/src/main/java/com/alibaba/nacos/client/naming/net/NamingProxy.java

public class NamingProxy {

    private static final int DEFAULT_SERVER_PORT = 8848;

    private int serverPort = DEFAULT_SERVER_PORT;

    private String namespaceId;

    private String endpoint;

    private String nacosDomain;

    private List<String> serverList;

    private List<String> serversFromEndpoint = new ArrayList<String>();

    private long lastSrvRefTime = 0L;

    private long vipSrvRefInterMillis = TimeUnit.SECONDS.toMillis(30);

    private Properties properties;

    public NamingProxy(String namespaceId,String endpoint,String serverList) {

        this.namespaceId = namespaceId;
        this.endpoint = endpoint;
        if
(StringUtils.isNotEmpty(serverList)) { this.serverList = Arrays.asList(serverList.split(",")); if (this.serverList.size() == 1) { this.nacosDomain = serverList; } } initRefreshSrvIfNeed(); } private void initRefreshSrvIfNeed
() { if (StringUtils.isEmpty(endpoint)) { return; } ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1,new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setName("com.alibaba.nacos.client.naming.serverlist.updater"
); t.setDaemon(true); return t; } }); executorService.scheduleWithFixedDelay(new Runnable() { @Override public void run() { refreshSrvIfNeed(); } },vipSrvRefInterMillis,TimeUnit.MILLISECONDS); refreshSrvIfNeed(); } //...... private void refreshSrvIfNeed() { try { if (!CollectionUtils.isEmpty(serverList)) { NAMING_LOGGER.debug("server list provided by user: " + serverList); return; } if (System.currentTimeMillis() - lastSrvRefTime < vipSrvRefInterMillis) { return; } List<String> list = getServerListFromEndpoint(); if (CollectionUtils.isEmpty(list)) { throw new Exception("Can not acquire Nacos list"); } if (!CollectionUtils.isEqualCollection(list,serversFromEndpoint)) { NAMING_LOGGER.info("[SERVER-LIST] server list is updated: " + list); } serversFromEndpoint = list; lastSrvRefTime = System.currentTimeMillis(); } catch (Throwable e) { NAMING_LOGGER.warn("failed to update server list",e); } } public List<String> getServerListFromEndpoint() { try { String urlString = "http://" + endpoint + "/nacos/serverlist"; List<String> headers = builderHeaders(); HttpClient.HttpResult result = HttpClient.httpGet(urlString,headers,null,UtilAndComs.ENCODING); if (HttpURLConnection.HTTP_OK != result.code) { throw new IOException("Error while requesting: " + urlString + "'. Server returned: " + result.code); } String content = result.content; List<String> list = new ArrayList<String>(); for (String line : IoUtils.readLines(new StringReader(content))) { if (!line.trim().isEmpty()) { list.add(line.trim()); } } return list; } catch (Exception e) { e.printStackTrace(); } return null; } //...... } 複製程式碼
  • NamingProxy的構造器執行了initRefreshSrvIfNeed方法,該方法在endpoint不為空的時候,會註冊一個定時任務,每隔vipSrvRefInterMillis時間執行一次refreshSrvIfNeed方法,同時立馬呼叫了refreshSrvIfNeed方法
  • refreshSrvIfNeed方法在serverList為空,且距離lastSrvRefTime大於等於vipSrvRefInterMillis時會通過getServerListFromEndpoint()方法獲取serverList更新serversFromEndpoint及lastSrvRefTime
  • getServerListFromEndpoint方法會向endpoint請求/serverlist介面,獲取server端返回的serverList

NamingProxy.getServiceList

nacos-1.1.3/client/src/main/java/com/alibaba/nacos/client/naming/net/NamingProxy.java

public class NamingProxy {

    private static final int DEFAULT_SERVER_PORT = 8848;

    private int serverPort = DEFAULT_SERVER_PORT;

    private String namespaceId;

    private String endpoint;

    private String nacosDomain;

    private List<String> serverList;

    private List<String> serversFromEndpoint = new ArrayList<String>();

    private long lastSrvRefTime = 0L;

    private long vipSrvRefInterMillis = TimeUnit.SECONDS.toMillis(30);

    private Properties properties;

    //......

    public ListView<String> getServiceList(int pageNo,int pageSize,String groupName) throws NacosException {
        return getServiceList(pageNo,pageSize,groupName,null);
    }

    public ListView<String> getServiceList(int pageNo,String groupName,AbstractSelector selector) throws NacosException {

        Map<String,String> params = new HashMap<String,String>(4);
        params.put("pageNo",String.valueOf(pageNo));
        params.put("pageSize",String.valueOf(pageSize));
        params.put(CommonParams.NAMESPACE_ID,namespaceId);
        params.put(CommonParams.GROUP_NAME,groupName);

        if (selector != null) {
            switch (SelectorType.valueOf(selector.getType())) {
                case none:
                    break;
                case label:
                    ExpressionSelector expressionSelector = (ExpressionSelector) selector;
                    params.put("selector",JSON.toJSONString(expressionSelector));
                    break;
                default:
                    break;
            }
        }

        String result = reqAPI(UtilAndComs.NACOS_URL_BASE + "/service/list",params);

        JSONObject json = JSON.parseObject(result);
        ListView<String> listView = new ListView<String>();
        listView.setCount(json.getInteger("count"));
        listView.setData(JSON.parseObject(json.getString("doms"),new TypeReference<List<String>>() {
        }));

        return listView;
    }

    public String reqAPI(String api,Map<String,String> params) throws NacosException {

        List<String> snapshot = serversFromEndpoint;
        if (!CollectionUtils.isEmpty(serverList)) {
            snapshot = serverList;
        }

        return reqAPI(api,params,snapshot);
    }

    public String reqAPI(String api,String> params,List<String> servers) {
        return reqAPI(api,servers,HttpMethod.GET);
    }

    public String reqAPI(String api,List<String> servers,String method) {

        params.put(CommonParams.NAMESPACE_ID,getNamespaceId());

        if (CollectionUtils.isEmpty(servers) && StringUtils.isEmpty(nacosDomain)) {
            throw new IllegalArgumentException("no server available");
        }

        Exception exception = new Exception();

        if (servers != null && !servers.isEmpty()) {

            Random random = new Random(System.currentTimeMillis());
            int index = random.nextInt(servers.size());

            for (int i = 0; i < servers.size(); i++) {
                String server = servers.get(index);
                try {
                    return callServer(api,server,method);
                } catch (NacosException e) {
                    exception = e;
                    NAMING_LOGGER.error("request {} failed.",e);
                } catch (Exception e) {
                    exception = e;
                    NAMING_LOGGER.error("request {} failed.",e);
                }

                index = (index + 1) % servers.size();
            }

            throw new IllegalStateException("failed to req API:" + api + " after all servers(" + servers + ") tried: "
                + exception.getMessage());
        }

        for (int i = 0; i < UtilAndComs.REQUEST_DOMAIN_RETRY_COUNT; i++) {
            try {
                return callServer(api,nacosDomain);
            } catch (Exception e) {
                exception = e;
                NAMING_LOGGER.error("[NA] req api:" + api + " failed,server(" + nacosDomain,e);
            }
        }

        throw new IllegalStateException("failed to req API:/api/" + api + " after all servers(" + servers + ") tried: "
            + exception.getMessage());

    }
                
    //......
}
複製程式碼
  • getServiceList方法有個AbstractSelector引數,它會往請求的引數裡頭新增selector引數,目前label型別會新增ExpressionSelector,之後呼叫reqAPI方法請求/service/list介面
  • reqAPI方法首先將serversFromEndpoint賦值給snapshot,但是serverList不為空的情況下會重置snapshot為serverList,然後進行reqAPI請求
  • reqAPI方法會根據servers.size()隨機一個index,然後以servers.size()為最大迴圈次數開始for迴圈,迴圈裡頭根據index獲取server然後通過callServer請求,請求成功則跳出迴圈返回,請求失敗則遞增index並對servers.size()取餘繼續下次迴圈,如果都請求失敗則最後丟擲IllegalStateException

小結

  • NamingProxy的構造器執行了initRefreshSrvIfNeed方法,該方法在endpoint不為空的時候,會註冊一個定時任務,每隔vipSrvRefInterMillis時間執行一次refreshSrvIfNeed方法
  • refreshSrvIfNeed方法在serverList為空,且距離lastSrvRefTime大於等於vipSrvRefInterMillis時會通過getServerListFromEndpoint()方法獲取serverList更新serversFromEndpoint及lastSrvRefTime
  • getServiceList方法優先以serverList作為server端地址列表,如果它為空再以serversFromEndpoint為準,然後通過reqAPI方法請求的時候,隨機選擇一個server進行請求,最多請求server.size()次,請求成功則跳出迴圈返回,請求失敗則遞增index並對servers.size()取餘繼續下次迴圈,如果都請求失敗則最後丟擲IllegalStateException

doc