1. 程式人生 > >zookeeper 資料與儲存原始碼實現

zookeeper 資料與儲存原始碼實現

在zk叢集啟動的時候,根據選擇的io型別(NIO或者Netty方式)啟動監聽客戶端連線通道,比如NIO方式

    @Override
    public void configure(InetSocketAddress addr, int maxcc) throws IOException {
        configureSaslLogin();

        thread = new ZooKeeperThread(this, "NIOServerCxn.Factory:" + addr);
        thread.setDaemon(true);
        maxClientCnxns = maxcc;
        this
.ss = ServerSocketChannel.open(); ss.socket().setReuseAddress(true); LOG.info("binding to port " + addr); ss.socket().bind(addr); ss.configureBlocking(false); ss.register(selector, SelectionKey.OP_ACCEPT); }

然後會跑一個執行緒,迴圈去取接收到的客戶端連線事件

    public void run() {
        while (!ss.socket
().isClosed()) { try { selector.select(1000); Set<SelectionKey> selected; synchronized (this) { selected = selector.selectedKeys(); } ArrayList<SelectionKey> selectedList = new ArrayList<SelectionKey>( selected);
Collections.shuffle(selectedList); for (SelectionKey k : selectedList) { //表示客戶端有連線請求 if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) { SocketChannel sc = ((ServerSocketChannel) k .channel()).accept(); InetAddress ia = sc.socket().getInetAddress(); int cnxncount = getClientCnxnCount(ia); if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns){ LOG.warn("Too many connections from " + ia + " - max is " + maxClientCnxns ); sc.close(); } else { LOG.info("Accepted socket connection from " + sc.socket().getRemoteSocketAddress()); sc.configureBlocking(false); // 向selector註冊一個讀請求監聽 SelectionKey sk = sc.register(selector, SelectionKey.OP_READ); //建立連線生成一個NIOServerCnxn NIOServerCnxn cnxn = createConnection(sc, sk); sk.attach(cnxn); addCnxn(cnxn); } } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) { NIOServerCnxn c = (NIOServerCnxn) k.attachment(); c.doIO(k); } else { if (LOG.isDebugEnabled()) { LOG.debug("Unexpected ops in select " + k.readyOps()); } } } selected.clear(); } catch (RuntimeException e) { LOG.warn("Ignoring unexpected runtime exception", e); } catch (Exception e) { LOG.warn("Ignoring exception", e); } } closeAll(); LOG.info("NIOServerCnxn factory exited run method"); }