1. 程式人生 > >Zookeeper(一)客戶端

Zookeeper(一)客戶端

是否 觀察者 信息 err 一次 getch add tor ces

Zookeeper-客戶端

例子:

// org.apache.zookeeper.ZooKeeperMain
public class ZooKeeperMain {
    public static void main(String args[]) throws CliException, IOException, InterruptedException {
    	//1. 初始化zk配置,並建立連接
        ZooKeeperMain main = new ZooKeeperMain(args);
        //2. 一直等待控制臺讀入命令行 並執行
        main.run();
    }
	public ZooKeeperMain(String args[]) throws IOException, InterruptedException {
        //1.1 連接配置解析
        cl.parseOptions(args);
        System.out.println("Connecting to " + cl.getOption("server"));
        connectToZK(cl.getOption("server"));
    }
    //1.2 建立連接
    protected void connectToZK(String newHost) throws InterruptedException, IOException {
        //連接已經存在 關閉連接 重新創建
        if (zk != null && zk.getState().isAlive()) {
            zk.close();
        }
        host = newHost;
        boolean readOnly = cl.getOption("readonly") != null;
        if (cl.getOption("secure") != null) {
            System.setProperty(ZKClientConfig.SECURE_CLIENT, "true");
            System.out.println("Secure connection is enabled");
        }
        zk = new ZooKeeperAdmin(host, Integer.parseInt(cl.getOption("timeout")), new MyWatcher(), readOnly);
    }
    //1.3 自定義監聽器
    private class MyWatcher implements Watcher {
        public void process(WatchedEvent event) {
            if (getPrintWatches()) {
                ZooKeeperMain.printMessage("WATCHER::");
                ZooKeeperMain.printMessage(event.toString());
            }
        }
    }
}

問題

  1. zk怎麽體現最終一致性:
  2. zk的監控在客戶端和服務端的連接過程中起到什麽作用:節點更新,服務端通知客戶端,客戶端調用回調方法處理
  3. zk對節點的原子操作是怎麽體現的:版本控制,節點內部維護三種版本
  4. 客戶端與服務端連接會話中的各個狀態下 客戶端處理什麽樣的事情

基本功能

  1. 以樹形結構存儲數據,葉子節點可以存儲數據

    • 文件系統
    • 配置管理
    • 命名服務
  2. 當某個節點的子節點變更,連接在這個節點的client可以實時監聽到變化

    • 集群管理
  3. client對節點操作時,是原子操作

    • 遠程鎖:分布式鎖

基本術語

  • States客戶端狀態:
public enum States {
    CONNECTING, ASSOCIATING, CONNECTED, CONNECTEDREADONLY,
    CLOSED, AUTH_FAILED, NOT_CONNECTED;
    public boolean isAlive() {
        return this != CLOSED && this != AUTH_FAILED;
    }
    public boolean isConnected() {
        return this == CONNECTED || this == CONNECTEDREADONLY;
    }
}
  • Packet傳給服務端的數據包
//ClientCnxn內部定義的一個堆協議層的封裝,用作zk中請求和響應的載體;
static class Packet {
    
}
  • ClientCnxnSocket底層與服務端通信類
//真正與服務端連接的抽象類;有兩個子類分別使用jdk.nio/netty.nio實現會話操作
abstract class ClientCnxnSocket {
    abstract void connect(InetSocketAddress addr) throws IOException;
    //會從outgoingQueue中取出一個可發送的Packet對象,
    //同時生成一個客戶端請求序號XID並將其設置到Packet請求頭中去,
    //然後序列化後再發送,請求發送完畢後,會立即將該Packet保存到pendingQueue中,
    //以便等待服務端響應返回後進行相應的處理。
    abstract void doTransport(int waitTimeOut, List<Packet> pendingQueue,
            ClientCnxn cnxn) throws IOException, InterruptedException;
}
  • WatchedEvent監控事件
//包含發生的事件,zookeeper當前狀態信息,事件涉及的節點路徑
public class WatchedEvent {
    final private KeeperState keeperState;
    final private EventType eventType;
    private String path;
}
  • Watcher:事件處理類的基本父類
  • KeeperState:Event事件中Zookeeper可能存在的所有狀態
  • EventType:Zookeeper中各種Event類型
  • WatcherType:
//內部包含兩個類Event,WatchType
public interface Watcher {
    
}

與服務端建立連接

ZooKeeperAdmin
//主要用於集群的管理任務,如重配置集群成員;
@InterfaceAudience.Public
public class ZooKeeperAdmin extends ZooKeeper {
	//ZooKeeperAdmin構造器最終調用父類構造器
	public ZooKeeperAdmin(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws IOException {
        super(connectString, sessionTimeout, watcher, canBeReadOnly);
    }
}
Zookeeper:客戶端
//功能:1.初始化服務客戶端連接服務端 1).創建客戶端對象 2).啟動客戶端內部線程
//	   2.提供操作數據功能 	      1).向服務端發送請求
@InterfaceAudience.Public
public class ZooKeeper implements AutoCloseable {
	protected final ClientCnxn cnxn;
    public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
            boolean canBeReadOnly, HostProvider aHostProvider)
            throws IOException {
        this(connectString, sessionTimeout, watcher, canBeReadOnly,
                aHostProvider, null);
    }
    public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
            boolean canBeReadOnly, HostProvider aHostProvider,
            ZKClientConfig clientConfig) throws IOException {
        LOG.info("Initiating client connection, connectString=" + connectString
                + " sessionTimeout=" + sessionTimeout + " watcher=" + watcher);
        if (clientConfig == null) {
            clientConfig = new ZKClientConfig();
        }
        this.clientConfig = clientConfig;
        watchManager = defaultWatchManager();
        watchManager.defaultWatcher = watcher;
        //解析連接ip:port
        ConnectStringParser connectStringParser = new ConnectStringParser(
                connectString);
        hostProvider = aHostProvider;
        //1. 創建管理連接的客戶端 ChrootPath為客戶端自定義的路徑頭
        cnxn = createConnection(connectStringParser.getChrootPath(),
                hostProvider, sessionTimeout, this, watchManager,
                getClientCnxnSocket(), canBeReadOnly);
        //2. 啟動客戶端內部線程        
        cnxn.start();
    }
    protected ClientCnxn createConnection(String chrootPath,
            HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,
            ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,
            boolean canBeReadOnly) throws IOException {
        return new ClientCnxn(chrootPath, hostProvider, sessionTimeout, this,
                watchManager, clientCnxnSocket, canBeReadOnly);
    }
}
ClientCnxn
//維護服務端和客戶端之間的網絡連接,並進行一系列的網絡通信:維護一個可用服務器的列表,當某客戶端需要時可透明的切換服務
public class ClientCnxn {
    final SendThread sendThread;
    final EventThread eventThread;
    //客戶端可以連接的服務端地址集合
    private final HostProvider hostProvider;
    //需要發送給服務端的數據包:最終通過SendThread調用clientCnxnSocket.doTransport發送給服務端
    private final LinkedBlockingDeque<Packet> outgoingQueue = new LinkedBlockingDeque<Packet>();
    //已經發送給服務端但還未得到響應的數據包集合
    private final LinkedList<Packet> pendingQueue = new LinkedList<Packet>();
    
    public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper, ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,
            long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) {
        this.zooKeeper = zooKeeper;
        this.watcher = watcher;
        this.sessionId = sessionId;
        this.sessionPasswd = sessionPasswd;
        this.sessionTimeout = sessionTimeout;
        this.hostProvider = hostProvider;
        this.chrootPath = chrootPath;
        connectTimeout = sessionTimeout / hostProvider.size();
        readTimeout = sessionTimeout * 2 / 3;
        readOnly = canBeReadOnly;
        sendThread = new SendThread(clientCnxnSocket);                                           eventThread = new EventThread();
        this.clientConfig = zooKeeper.getClientConfig();
        initRequestTimeout();
    }
    public void start() {
        // 0.建立連接會話 1.sasl驗證 startConnect
        // 2.創建監聽事件到事件隊列中 3.保持心跳
        sendThread.start();
        // 處理事件隊列中的事件
        eventThread.start();
    }
}

SendThread
  1. 維護了客戶端與服務端之間的會話生命周期(通過一定周期頻率內向服務端發送PING包檢測心跳),如果會話周期內客戶端與服務端出現TCP連接斷開,那麽就會自動且透明地完成重連操作。
  2. 管理了客戶端所有的請求發送和響應接收操作,其將上層客戶端API操作轉換成相應的請求協議並發送到服務端,並完成對同步調用的返回和異步調用的回調。
  3. 將來自服務端的事件傳遞給EventThread去處理。
 //ClientCnxn內部類:為傳出請求隊列服務並生成心跳
class SendThread extends ZooKeeperThread {
    private final ClientCnxnSocket clientCnxnSocket;
    private InetSocketAddress rwServerAddress = null;
    SendThread(ClientCnxnSocket clientCnxnSocket) {
        super(makeThreadName("-SendThread()"));
        state = States.CONNECTING;
        this.clientCnxnSocket = clientCnxnSocket;
        setDaemon(true);
    }
    @Override
    public void run() {
        //賦值
        clientCnxnSocket.introduce(this, sessionId, outgoingQueue);
        //更新時間
        clientCnxnSocket.updateNow();
        //更新上一次發送和接收的時間
        clientCnxnSocket.updateLastSendAndHeard();
        int to;
        long lastPingRwServer = Time.currentElapsedTime();
        //設置最大心跳ping間隔 10s
        final int MAX_SEND_PING_INTERVAL = 10000; //10 seconds
        InetSocketAddress serverAddress = null;
        while (state.isAlive()) {
            try {
                //一開始 客戶端還沒連接上服務端 嘗試初始化sasl認證並且連接服務端
                if (!clientCnxnSocket.isConnected()) {
                    // 與服務端連接斷開時 不再重建會話 直接跳出循環
                    if (closing) {
                        break;
                    }
                    if (rwServerAddress != null) {
                        serverAddress = rwServerAddress;
                        rwServerAddress = null;
                    } else {
                        //從服務端可連接集合中獲取下一個地址 如果全部嘗試過 則等待1s
                        serverAddress = hostProvider.next(1000);
                    }
                    //clientCnxnSocket作為底層與服務端通信的類
                    startConnect(serverAddress);
                    clientCnxnSocket.updateLastSendAndHeard();
                }
                //後來連接上了 說明認證也已經初始化好了 通過發送認證包給服務建立驗證
                if (state.isConnected()) {
                    // 確認是否需要發送認證失敗事件
                    if (zooKeeperSaslClient != null) {
                        boolean sendAuthEvent = false;
                        if (zooKeeperSaslClient.getSaslState() == ZooKeeperSaslClient.SaslState.INITIAL) {
                            try {
                                //向服務端發送當前客戶端sasl認證初始化請求
                                zooKeeperSaslClient.initialize(ClientCnxn.this);
                            } catch (SaslException e) {
                                LOG.error("SASL authentication with Zookeeper Quorum member failed: " + e);
                                state = States.AUTH_FAILED;
                                sendAuthEvent = true;
                            }
                        }
                        //獲得zk的sasl認證狀態
                        KeeperState authState = zooKeeperSaslClient.getKeeperState();
                        if (authState != null) {
                            if (authState == KeeperState.AuthFailed) {
                                //與服務端進行身份驗證時發生錯誤 狀態更改且需要發送認證事件
                                state = States.AUTH_FAILED;
                                sendAuthEvent = true;
                            } else {
                                //驗證通過
                                if (authState == KeeperState.SaslAuthenticated) {
                                    sendAuthEvent = true;
                                }
                            }
                        }
                        //是否需要發送認證事件
                        if (sendAuthEvent) {
                            // 生成相應的事件 並放入事件隊列中
                            eventThread.queueEvent(new WatchedEvent(
                                Watcher.Event.EventType.None,
                                authState,null));
                            if (state == States.AUTH_FAILED) {
                                eventThread.queueEventOfDeath();
                            }
                        }
                    }
                    to = readTimeout - clientCnxnSocket.getIdleRecv();
                } else {
                    to = connectTimeout - clientCnxnSocket.getIdleRecv();
                }
                if (to <= 0) {
                    String warnInfo;
                    warnInfo = "Client session timed out, have not heard from server in "
                        + clientCnxnSocket.getIdleRecv()
                        + "ms"
                        + " for sessionid 0x"
                        + Long.toHexString(sessionId);
                    LOG.warn(warnInfo);
                    throw new SessionTimeoutException(warnInfo);
                }
                if (state.isConnected()) {
                    //1000(1 second) is to prevent race condition missing to send the second ping
                    //also make sure not to send too many pings when readTimeout is small
                    //防止丟失 所有有下一次ping
                    int timeToNextPing = readTimeout / 2 - clientCnxnSocket.getIdleSend() 			- ((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0);
                    //send a ping request either time is due or no packet sent out within MAX_SEND_PING_INTERVAL
                    if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) {
                        //保持和服務端的心跳 發送ping
                        sendPing();
                        clientCnxnSocket.updateLastSend();
                    } else {
                        if (timeToNextPing < to) {
                            to = timeToNextPing;
                        }
                    }
                }
                // 如果當前是讀寫模式 則尋找讀寫服務器 todo
                if (state == States.CONNECTEDREADONLY) {
                    long now = Time.currentElapsedTime();
                    int idlePingRwServer = (int) (now - lastPingRwServer);
                    if (idlePingRwServer >= pingRwTimeout) {
                        lastPingRwServer = now;
                        idlePingRwServer = 0;
                        pingRwTimeout =
                            Math.min(2*pingRwTimeout, maxPingRwTimeout);
                        pingRwServer();
                    }
                    to = Math.min(to, pingRwTimeout - idlePingRwServer);
                }
				//確保所有前提都滿足 
                //取出等待隊列的頭部發送給服務端並從隊列中移除 並將其保存到pendingQueue中
                clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);
            } catch (Throwable e) {
                if (closing) {
                    if (LOG.isDebugEnabled()) {
                        // closing so this is expected
                        LOG.debug("An exception was thrown while closing send thread for session 0x" + Long.toHexString(getSessionId())  + " : " + e.getMessage());
                    }
                    break;
                } else {
                   //。。。。一堆拋出錯誤
                   //根據連接狀態處理當前仍舊往隊列中投放的事件
                   cleanAndNotifyState();
                }
            }
        }
        synchronized (state) {
            //清除當前隊列中所有等待的事件 不做處理
            cleanup();
        }
       	//當連接失效 主動關閉和服務端的連接
        clientCnxnSocket.close();
        if (state.isAlive()) {
            eventThread.queueEvent(new WatchedEvent(Event.EventType.None,
                          Event.KeeperState.Disconnected, null));
        }
        eventThread.queueEvent(new WatchedEvent(Event.EventType.None,
                                                Event.KeeperState.Closed, null));
        ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(),
          "SendThread exited loop for session: 0x"  + Long.toHexString(getSessionId()));
    }
    //發送心跳
    private void sendPing() {
        lastPingSentNs = System.nanoTime();
        RequestHeader h = new RequestHeader(-2, OpCode.ping);
        queuePacket(h, null, null, null, null, null, null, null, null);
    }
    //和服務端創建連接會話
    private void startConnect(InetSocketAddress addr) throws IOException {
        saslLoginFailed = false;
        //如果之前連接過 則緩1s
        if(!isFirstConnect){
            try {
                Thread.sleep(r.nextInt(1000));
            } catch (InterruptedException e) {
                LOG.warn("Unexpected exception", e);
            }
        }
        //連接狀態改為正在連接
        state = States.CONNECTING;
        String hostPort = addr.getHostString() + ":" + addr.getPort();
        MDC.put("myid", hostPort);
        //為當前線程設置線程名稱
        setName(getName().replaceAll("\\(.*\\)", "(" + hostPort + ")"));
        //客戶端連接是否需要認證 Y:新建認證 如果認證過 斷開重新認證
        if (clientConfig.isSaslClientEnabled()) {
            try {
                if (zooKeeperSaslClient != null) {
                    zooKeeperSaslClient.shutdown();
                }
                //初始化客戶端sasl驗證 sasl狀態為初始化initial
                zooKeeperSaslClient = new ZooKeeperSaslClient(SaslServerPrincipal.getServerPrincipal(addr, clientConfig),
                                                              clientConfig);
            } catch (LoginException e) {
                //在SASL客戶端初始化的過程中認證失敗了,與和zk服務端連接過程出現的認證失敗不同
                LOG.warn("SASL configuration failed: " + e + " Will continue connection to Zookeeper server without " + "SASL authentication, if Zookeeper server allows it.");
                //為當前認證失敗創建新的監控任務
                eventThread.queueEvent(new WatchedEvent(
                    Watcher.Event.EventType.None,
                    Watcher.Event.KeeperState.AuthFailed, null));
                saslLoginFailed = true;
            }
        }
        logStartConnect(addr);
        //與服務端通信
        clientCnxnSocket.connect(addr);
    }
}

EventThread
  1. 負責客戶端的事件處理,並觸發客戶端註冊的Watcher監聽。
  2. EventThread中的watingEvents隊列用於臨時存放那些需要被觸發的Object,包括客戶端註冊的Watcher和異步接口中註冊的回調器AsyncCallback。
  3. 同時,EventThread會不斷地從watingEvents中取出Object,識別具體類型(Watcher或AsyncCallback),並分別調用process和processResult接口方法來實現對事件的觸發和回調。
//ClientCnxn內部類:無限處理等待隊列中的監聽事務
class EventThread extends ZooKeeperThread {
    //等待處理的事件隊列
    private final LinkedBlockingQueue<Object> waitingEvents =
    new LinkedBlockingQueue<Object>();
    @Override
    @SuppressFBWarnings("JLM_JSR166_UTILCONCURRENT_MONITORENTER")
    public void run() {
        try {
              isRunning = true;
              while (true) {
                 Object event = waitingEvents.take();
                  //eventOfDeath代表出現了身份認證失敗 
                 if (event == eventOfDeath) {
                    wasKilled = true;
                 } else {
                     //核心處理
                    processEvent(event);
                 }
                 if (wasKilled)
                    synchronized (waitingEvents) {
                       if (waitingEvents.isEmpty()) {
                          isRunning = false;
                          break;
                       }
                    }
              }
           }//省略日誌代碼
    }
}

由上述事件處理線程的run方法得出問題:

  1. 添加事件:隊列中的事件waitingEvents是從哪裏添加的

    EventThread內部的queueEvent,queueCallback,queuePacket,queueEventOfDeath

//客戶端訪問服務端時使用:如操作節點數據等
public void queueEvent(WatchedEvent event) {
    queueEvent(event, null);
}
private void queueEvent(WatchedEvent event, Set<Watcher> materializedWatchers) {
    if (event.getType() == EventType.None && sessionState == event.getState()) {
        return;
    }
    sessionState = event.getState();
    final Set<Watcher> watchers;
    if (materializedWatchers == null) {
        // 根據事件信息生成一系列的觀察者:由zk實現
        watchers = watcher.materialize(event.getState(),
                event.getType(), event.getPath());
    } else {
        watchers = new HashSet<Watcher>();
        watchers.addAll(materializedWatchers);
    }
    //將watcher集合和對應的事件組裝 執行處理時 循環watchers處理
    WatcherSetEventPair pair = new WatcherSetEventPair(watchers, event);
    waitingEvents.add(pair);
}
//添加異步回調事件:TODO 什麽情況下會用到
public void queueCallback(AsyncCallback cb, int rc, String path, Object ctx) {
    waitingEvents.add(new LocalCallback(cb, rc, path, ctx));
}
//客戶端連接出錯等情況下使用 TODO
@SuppressFBWarnings("JLM_JSR166_UTILCONCURRENT_MONITORENTER")
public void queuePacket(Packet packet) {
    if (wasKilled) {
        synchronized (waitingEvents) {
            if (isRunning) waitingEvents.add(packet);
            else processEvent(packet);
        }
    } else {
        waitingEvents.add(packet);
    }
}

  1. 處理事件:processEvent(event)

    事件類型有三種:WatcherSetEventPair,LocalCallback

    核心就是調用watcher處理

WatcherSetEventPair pair = (WatcherSetEventPair) event;
for (Watcher watcher : pair.watchers) {
     watcher.process(pair.event);
}

Zookeeper(一)客戶端