Zookeeper(一)客戶端
阿新 • • 發佈:2019-04-28
是否 觀察者 信息 err 一次 getch add tor ces
Zookeeper-客戶端
例子:
// org.apache.zookeeper.ZooKeeperMain public class ZooKeeperMain { public static void main(String args[]) throws CliException, IOException, InterruptedException { //1. 初始化zk配置,並建立連接 ZooKeeperMain main = new ZooKeeperMain(args); //2. 一直等待控制臺讀入命令行 並執行 main.run(); } public ZooKeeperMain(String args[]) throws IOException, InterruptedException { //1.1 連接配置解析 cl.parseOptions(args); System.out.println("Connecting to " + cl.getOption("server")); connectToZK(cl.getOption("server")); } //1.2 建立連接 protected void connectToZK(String newHost) throws InterruptedException, IOException { //連接已經存在 關閉連接 重新創建 if (zk != null && zk.getState().isAlive()) { zk.close(); } host = newHost; boolean readOnly = cl.getOption("readonly") != null; if (cl.getOption("secure") != null) { System.setProperty(ZKClientConfig.SECURE_CLIENT, "true"); System.out.println("Secure connection is enabled"); } zk = new ZooKeeperAdmin(host, Integer.parseInt(cl.getOption("timeout")), new MyWatcher(), readOnly); } //1.3 自定義監聽器 private class MyWatcher implements Watcher { public void process(WatchedEvent event) { if (getPrintWatches()) { ZooKeeperMain.printMessage("WATCHER::"); ZooKeeperMain.printMessage(event.toString()); } } } }
問題
- zk怎麽體現最終一致性:
- zk的監控在客戶端和服務端的連接過程中起到什麽作用:節點更新,服務端通知客戶端,客戶端調用回調方法處理
- zk對節點的原子操作是怎麽體現的:版本控制,節點內部維護三種版本
- 客戶端與服務端連接會話中的各個狀態下 客戶端處理什麽樣的事情
基本功能
-
以樹形結構存儲數據,葉子節點可以存儲數據
- 文件系統
- 配置管理
- 命名服務
-
當某個節點的子節點變更,連接在這個節點的client可以實時監聽到變化
- 集群管理
-
client對節點操作時,是原子操作
- 遠程鎖:分布式鎖
基本術語
- States客戶端狀態:
public enum States { CONNECTING, ASSOCIATING, CONNECTED, CONNECTEDREADONLY, CLOSED, AUTH_FAILED, NOT_CONNECTED; public boolean isAlive() { return this != CLOSED && this != AUTH_FAILED; } public boolean isConnected() { return this == CONNECTED || this == CONNECTEDREADONLY; } }
- Packet傳給服務端的數據包
//ClientCnxn內部定義的一個堆協議層的封裝,用作zk中請求和響應的載體;
static class Packet {
}
- ClientCnxnSocket底層與服務端通信類
//真正與服務端連接的抽象類;有兩個子類分別使用jdk.nio/netty.nio實現會話操作
abstract class ClientCnxnSocket {
abstract void connect(InetSocketAddress addr) throws IOException;
//會從outgoingQueue中取出一個可發送的Packet對象,
//同時生成一個客戶端請求序號XID並將其設置到Packet請求頭中去,
//然後序列化後再發送,請求發送完畢後,會立即將該Packet保存到pendingQueue中,
//以便等待服務端響應返回後進行相應的處理。
abstract void doTransport(int waitTimeOut, List<Packet> pendingQueue,
ClientCnxn cnxn) throws IOException, InterruptedException;
}
- WatchedEvent監控事件
//包含發生的事件,zookeeper當前狀態信息,事件涉及的節點路徑
public class WatchedEvent {
final private KeeperState keeperState;
final private EventType eventType;
private String path;
}
- Watcher:事件處理類的基本父類
- KeeperState:Event事件中Zookeeper可能存在的所有狀態
- EventType:Zookeeper中各種Event類型
- WatcherType:
//內部包含兩個類Event,WatchType
public interface Watcher {
}
與服務端建立連接
ZooKeeperAdmin
//主要用於集群的管理任務,如重配置集群成員;
@InterfaceAudience.Public
public class ZooKeeperAdmin extends ZooKeeper {
//ZooKeeperAdmin構造器最終調用父類構造器
public ZooKeeperAdmin(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws IOException {
super(connectString, sessionTimeout, watcher, canBeReadOnly);
}
}
Zookeeper:客戶端
//功能:1.初始化服務客戶端連接服務端 1).創建客戶端對象 2).啟動客戶端內部線程
// 2.提供操作數據功能 1).向服務端發送請求
@InterfaceAudience.Public
public class ZooKeeper implements AutoCloseable {
protected final ClientCnxn cnxn;
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
boolean canBeReadOnly, HostProvider aHostProvider)
throws IOException {
this(connectString, sessionTimeout, watcher, canBeReadOnly,
aHostProvider, null);
}
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
boolean canBeReadOnly, HostProvider aHostProvider,
ZKClientConfig clientConfig) throws IOException {
LOG.info("Initiating client connection, connectString=" + connectString
+ " sessionTimeout=" + sessionTimeout + " watcher=" + watcher);
if (clientConfig == null) {
clientConfig = new ZKClientConfig();
}
this.clientConfig = clientConfig;
watchManager = defaultWatchManager();
watchManager.defaultWatcher = watcher;
//解析連接ip:port
ConnectStringParser connectStringParser = new ConnectStringParser(
connectString);
hostProvider = aHostProvider;
//1. 創建管理連接的客戶端 ChrootPath為客戶端自定義的路徑頭
cnxn = createConnection(connectStringParser.getChrootPath(),
hostProvider, sessionTimeout, this, watchManager,
getClientCnxnSocket(), canBeReadOnly);
//2. 啟動客戶端內部線程
cnxn.start();
}
protected ClientCnxn createConnection(String chrootPath,
HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,
ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,
boolean canBeReadOnly) throws IOException {
return new ClientCnxn(chrootPath, hostProvider, sessionTimeout, this,
watchManager, clientCnxnSocket, canBeReadOnly);
}
}
ClientCnxn
//維護服務端和客戶端之間的網絡連接,並進行一系列的網絡通信:維護一個可用服務器的列表,當某客戶端需要時可透明的切換服務
public class ClientCnxn {
final SendThread sendThread;
final EventThread eventThread;
//客戶端可以連接的服務端地址集合
private final HostProvider hostProvider;
//需要發送給服務端的數據包:最終通過SendThread調用clientCnxnSocket.doTransport發送給服務端
private final LinkedBlockingDeque<Packet> outgoingQueue = new LinkedBlockingDeque<Packet>();
//已經發送給服務端但還未得到響應的數據包集合
private final LinkedList<Packet> pendingQueue = new LinkedList<Packet>();
public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper, ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,
long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) {
this.zooKeeper = zooKeeper;
this.watcher = watcher;
this.sessionId = sessionId;
this.sessionPasswd = sessionPasswd;
this.sessionTimeout = sessionTimeout;
this.hostProvider = hostProvider;
this.chrootPath = chrootPath;
connectTimeout = sessionTimeout / hostProvider.size();
readTimeout = sessionTimeout * 2 / 3;
readOnly = canBeReadOnly;
sendThread = new SendThread(clientCnxnSocket); eventThread = new EventThread();
this.clientConfig = zooKeeper.getClientConfig();
initRequestTimeout();
}
public void start() {
// 0.建立連接會話 1.sasl驗證 startConnect
// 2.創建監聽事件到事件隊列中 3.保持心跳
sendThread.start();
// 處理事件隊列中的事件
eventThread.start();
}
}
SendThread
- 維護了客戶端與服務端之間的會話生命周期(通過一定周期頻率內向服務端發送PING包檢測心跳),如果會話周期內客戶端與服務端出現TCP連接斷開,那麽就會自動且透明地完成重連操作。
- 管理了客戶端所有的請求發送和響應接收操作,其將上層客戶端API操作轉換成相應的請求協議並發送到服務端,並完成對同步調用的返回和異步調用的回調。
- 將來自服務端的事件傳遞給EventThread去處理。
//ClientCnxn內部類:為傳出請求隊列服務並生成心跳
class SendThread extends ZooKeeperThread {
private final ClientCnxnSocket clientCnxnSocket;
private InetSocketAddress rwServerAddress = null;
SendThread(ClientCnxnSocket clientCnxnSocket) {
super(makeThreadName("-SendThread()"));
state = States.CONNECTING;
this.clientCnxnSocket = clientCnxnSocket;
setDaemon(true);
}
@Override
public void run() {
//賦值
clientCnxnSocket.introduce(this, sessionId, outgoingQueue);
//更新時間
clientCnxnSocket.updateNow();
//更新上一次發送和接收的時間
clientCnxnSocket.updateLastSendAndHeard();
int to;
long lastPingRwServer = Time.currentElapsedTime();
//設置最大心跳ping間隔 10s
final int MAX_SEND_PING_INTERVAL = 10000; //10 seconds
InetSocketAddress serverAddress = null;
while (state.isAlive()) {
try {
//一開始 客戶端還沒連接上服務端 嘗試初始化sasl認證並且連接服務端
if (!clientCnxnSocket.isConnected()) {
// 與服務端連接斷開時 不再重建會話 直接跳出循環
if (closing) {
break;
}
if (rwServerAddress != null) {
serverAddress = rwServerAddress;
rwServerAddress = null;
} else {
//從服務端可連接集合中獲取下一個地址 如果全部嘗試過 則等待1s
serverAddress = hostProvider.next(1000);
}
//clientCnxnSocket作為底層與服務端通信的類
startConnect(serverAddress);
clientCnxnSocket.updateLastSendAndHeard();
}
//後來連接上了 說明認證也已經初始化好了 通過發送認證包給服務建立驗證
if (state.isConnected()) {
// 確認是否需要發送認證失敗事件
if (zooKeeperSaslClient != null) {
boolean sendAuthEvent = false;
if (zooKeeperSaslClient.getSaslState() == ZooKeeperSaslClient.SaslState.INITIAL) {
try {
//向服務端發送當前客戶端sasl認證初始化請求
zooKeeperSaslClient.initialize(ClientCnxn.this);
} catch (SaslException e) {
LOG.error("SASL authentication with Zookeeper Quorum member failed: " + e);
state = States.AUTH_FAILED;
sendAuthEvent = true;
}
}
//獲得zk的sasl認證狀態
KeeperState authState = zooKeeperSaslClient.getKeeperState();
if (authState != null) {
if (authState == KeeperState.AuthFailed) {
//與服務端進行身份驗證時發生錯誤 狀態更改且需要發送認證事件
state = States.AUTH_FAILED;
sendAuthEvent = true;
} else {
//驗證通過
if (authState == KeeperState.SaslAuthenticated) {
sendAuthEvent = true;
}
}
}
//是否需要發送認證事件
if (sendAuthEvent) {
// 生成相應的事件 並放入事件隊列中
eventThread.queueEvent(new WatchedEvent(
Watcher.Event.EventType.None,
authState,null));
if (state == States.AUTH_FAILED) {
eventThread.queueEventOfDeath();
}
}
}
to = readTimeout - clientCnxnSocket.getIdleRecv();
} else {
to = connectTimeout - clientCnxnSocket.getIdleRecv();
}
if (to <= 0) {
String warnInfo;
warnInfo = "Client session timed out, have not heard from server in "
+ clientCnxnSocket.getIdleRecv()
+ "ms"
+ " for sessionid 0x"
+ Long.toHexString(sessionId);
LOG.warn(warnInfo);
throw new SessionTimeoutException(warnInfo);
}
if (state.isConnected()) {
//1000(1 second) is to prevent race condition missing to send the second ping
//also make sure not to send too many pings when readTimeout is small
//防止丟失 所有有下一次ping
int timeToNextPing = readTimeout / 2 - clientCnxnSocket.getIdleSend() - ((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0);
//send a ping request either time is due or no packet sent out within MAX_SEND_PING_INTERVAL
if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) {
//保持和服務端的心跳 發送ping
sendPing();
clientCnxnSocket.updateLastSend();
} else {
if (timeToNextPing < to) {
to = timeToNextPing;
}
}
}
// 如果當前是讀寫模式 則尋找讀寫服務器 todo
if (state == States.CONNECTEDREADONLY) {
long now = Time.currentElapsedTime();
int idlePingRwServer = (int) (now - lastPingRwServer);
if (idlePingRwServer >= pingRwTimeout) {
lastPingRwServer = now;
idlePingRwServer = 0;
pingRwTimeout =
Math.min(2*pingRwTimeout, maxPingRwTimeout);
pingRwServer();
}
to = Math.min(to, pingRwTimeout - idlePingRwServer);
}
//確保所有前提都滿足
//取出等待隊列的頭部發送給服務端並從隊列中移除 並將其保存到pendingQueue中
clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);
} catch (Throwable e) {
if (closing) {
if (LOG.isDebugEnabled()) {
// closing so this is expected
LOG.debug("An exception was thrown while closing send thread for session 0x" + Long.toHexString(getSessionId()) + " : " + e.getMessage());
}
break;
} else {
//。。。。一堆拋出錯誤
//根據連接狀態處理當前仍舊往隊列中投放的事件
cleanAndNotifyState();
}
}
}
synchronized (state) {
//清除當前隊列中所有等待的事件 不做處理
cleanup();
}
//當連接失效 主動關閉和服務端的連接
clientCnxnSocket.close();
if (state.isAlive()) {
eventThread.queueEvent(new WatchedEvent(Event.EventType.None,
Event.KeeperState.Disconnected, null));
}
eventThread.queueEvent(new WatchedEvent(Event.EventType.None,
Event.KeeperState.Closed, null));
ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(),
"SendThread exited loop for session: 0x" + Long.toHexString(getSessionId()));
}
//發送心跳
private void sendPing() {
lastPingSentNs = System.nanoTime();
RequestHeader h = new RequestHeader(-2, OpCode.ping);
queuePacket(h, null, null, null, null, null, null, null, null);
}
//和服務端創建連接會話
private void startConnect(InetSocketAddress addr) throws IOException {
saslLoginFailed = false;
//如果之前連接過 則緩1s
if(!isFirstConnect){
try {
Thread.sleep(r.nextInt(1000));
} catch (InterruptedException e) {
LOG.warn("Unexpected exception", e);
}
}
//連接狀態改為正在連接
state = States.CONNECTING;
String hostPort = addr.getHostString() + ":" + addr.getPort();
MDC.put("myid", hostPort);
//為當前線程設置線程名稱
setName(getName().replaceAll("\\(.*\\)", "(" + hostPort + ")"));
//客戶端連接是否需要認證 Y:新建認證 如果認證過 斷開重新認證
if (clientConfig.isSaslClientEnabled()) {
try {
if (zooKeeperSaslClient != null) {
zooKeeperSaslClient.shutdown();
}
//初始化客戶端sasl驗證 sasl狀態為初始化initial
zooKeeperSaslClient = new ZooKeeperSaslClient(SaslServerPrincipal.getServerPrincipal(addr, clientConfig),
clientConfig);
} catch (LoginException e) {
//在SASL客戶端初始化的過程中認證失敗了,與和zk服務端連接過程出現的認證失敗不同
LOG.warn("SASL configuration failed: " + e + " Will continue connection to Zookeeper server without " + "SASL authentication, if Zookeeper server allows it.");
//為當前認證失敗創建新的監控任務
eventThread.queueEvent(new WatchedEvent(
Watcher.Event.EventType.None,
Watcher.Event.KeeperState.AuthFailed, null));
saslLoginFailed = true;
}
}
logStartConnect(addr);
//與服務端通信
clientCnxnSocket.connect(addr);
}
}
EventThread
- 負責客戶端的事件處理,並觸發客戶端註冊的Watcher監聽。
- EventThread中的watingEvents隊列用於臨時存放那些需要被觸發的Object,包括客戶端註冊的Watcher和異步接口中註冊的回調器AsyncCallback。
- 同時,EventThread會不斷地從watingEvents中取出Object,識別具體類型(Watcher或AsyncCallback),並分別調用process和processResult接口方法來實現對事件的觸發和回調。
//ClientCnxn內部類:無限處理等待隊列中的監聽事務
class EventThread extends ZooKeeperThread {
//等待處理的事件隊列
private final LinkedBlockingQueue<Object> waitingEvents =
new LinkedBlockingQueue<Object>();
@Override
@SuppressFBWarnings("JLM_JSR166_UTILCONCURRENT_MONITORENTER")
public void run() {
try {
isRunning = true;
while (true) {
Object event = waitingEvents.take();
//eventOfDeath代表出現了身份認證失敗
if (event == eventOfDeath) {
wasKilled = true;
} else {
//核心處理
processEvent(event);
}
if (wasKilled)
synchronized (waitingEvents) {
if (waitingEvents.isEmpty()) {
isRunning = false;
break;
}
}
}
}//省略日誌代碼
}
}
由上述事件處理線程的run方法得出問題:
-
添加事件:隊列中的事件waitingEvents是從哪裏添加的
EventThread內部的queueEvent,queueCallback,queuePacket,queueEventOfDeath
//客戶端訪問服務端時使用:如操作節點數據等
public void queueEvent(WatchedEvent event) {
queueEvent(event, null);
}
private void queueEvent(WatchedEvent event, Set<Watcher> materializedWatchers) {
if (event.getType() == EventType.None && sessionState == event.getState()) {
return;
}
sessionState = event.getState();
final Set<Watcher> watchers;
if (materializedWatchers == null) {
// 根據事件信息生成一系列的觀察者:由zk實現
watchers = watcher.materialize(event.getState(),
event.getType(), event.getPath());
} else {
watchers = new HashSet<Watcher>();
watchers.addAll(materializedWatchers);
}
//將watcher集合和對應的事件組裝 執行處理時 循環watchers處理
WatcherSetEventPair pair = new WatcherSetEventPair(watchers, event);
waitingEvents.add(pair);
}
//添加異步回調事件:TODO 什麽情況下會用到
public void queueCallback(AsyncCallback cb, int rc, String path, Object ctx) {
waitingEvents.add(new LocalCallback(cb, rc, path, ctx));
}
//客戶端連接出錯等情況下使用 TODO
@SuppressFBWarnings("JLM_JSR166_UTILCONCURRENT_MONITORENTER")
public void queuePacket(Packet packet) {
if (wasKilled) {
synchronized (waitingEvents) {
if (isRunning) waitingEvents.add(packet);
else processEvent(packet);
}
} else {
waitingEvents.add(packet);
}
}
-
處理事件:processEvent(event)
事件類型有三種:WatcherSetEventPair,LocalCallback
核心就是調用watcher處理
WatcherSetEventPair pair = (WatcherSetEventPair) event;
for (Watcher watcher : pair.watchers) {
watcher.process(pair.event);
}
Zookeeper(一)客戶端