Zookeeper原始碼閱讀(九) ZK Client-Server(2)
前言
前面一篇部落格主要從大致流程的角度說了下client和server建立連線的流程,這篇和下一篇部落格會詳細的把上一篇不是很細緻的地方展開和補充。
初始化階段
初始化階段主要就是把Zookeeper類中比較重要的功能類例項化,前面對這個過程說的已經比較詳細了。這裡主要補充幾點:
ClientCnxn初始化
cnxn = new ClientCnxn(connectStringParser.getChrootPath(), hostProvider, sessionTimeout, this, watchManager, getClientCnxnSocket(), canBeReadOnly);
public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,
ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,
long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) {
可以看到ClientCnxn的構造器中有一個非常重要的引數是ClientCnxnSocket,這也是client和server建立連線的功能類,這裡看下如何獲得的。
private static ClientCnxnSocket getClientCnxnSocket() throws IOException { //獲取系統配置 String clientCnxnSocketName = System .getProperty(ZOOKEEPER_CLIENT_CNXN_SOCKET); //如果沒有特別設定,那麼採用NIO的實現為預設實現 if (clientCnxnSocketName == null) { clientCnxnSocketName = ClientCnxnSocketNIO.class.getName(); } try { //反射來獲取物件例項 return (ClientCnxnSocket) Class.forName(clientCnxnSocketName).getDeclaredConstructor() .newInstance(); } catch (Exception e) { IOException ioe = new IOException("Couldn't instantiate " + clientCnxnSocketName); ioe.initCause(e); throw ioe; } }
可以看到,clientCnxn的物件是通過反射獲得的。
如果還記得的話,之前watcher傳送的時候就是clientCnxn來發送的,當然,其實client端和server的連線都是通過這個類來做的,具體的方法涉及到doTransport, doIO等,具體使用了NIO的一些方法,之後把NIO和Netty弄得比較清楚後再來把這部分補上。
StaticHostProvider裡的等待
public InetSocketAddress next(long spinDelay) {
//每嘗試一次currentindex加一
++currentIndex;
//試了所有的server
if (currentIndex == serverAddresses.size()) {
currentIndex = 0;
}
//試了一圈就會有currentIndex == lastIndex
if (currentIndex == lastIndex && spinDelay > 0) {
try {
Thread.sleep(spinDelay);
} catch (InterruptedException e) {
LOG.warn("Unexpected exception", e);
}
} else if (lastIndex == -1) {
// We don't want to sleep on the first ever connect attempt.
lastIndex = 0;
}
return serverAddresses.get(currentIndex);
}
剛開始看到這個方法的時候很奇怪,一般next方法基本都沒有引數的,這裡很奇怪的加了個delay的時間,仔細看了看才發現有特殊的考慮,如果所有server都試過了且連不上,就會sleep spinDelay時間再嘗試。
建立階段
如前篇部落格所說,當zookeeper構造器中clientCnxn的start方法呼叫也就是sendthread和eventthread開始run起來時,建立過程便開始了。
public void start() {
sendThread.start();
eventThread.start();
}
建立連線的過程主要是sendthread做的,啟動了start()方法實際上就是呼叫了sendthread的run方法,但是這個方法比較複雜,我們從sendThread的別的方法開始,最後再過run方法這樣清晰一點。而sendthread的功能之前有說過,這裡引用一份別人的總結,下面看方法功能的時候也可以對照:
- 維護了客戶端與服務端之間的會話生命週期(通過一定週期頻率內向服務端傳送PING包檢測心跳),如果會話週期內客戶端與服務端出現TCP連線斷開,那麼就會自動且透明地完成重連操作。
- 管理了客戶端所有的請求傳送和響應接收操作,其將上層客戶端API操作轉換成相應的請求協議併發送到服務端,並完成對同步呼叫的返回和非同步呼叫的回撥。
- 將來自服務端的事件傳遞給EventThread去處理。
下面過一下幾個比較重要的方法:
sendPing
這是sendthread功能第一點的方法,保證了和server之間的ping連線,也就是心跳。
private void sendPing() {
lastPingSentNs = System.nanoTime();//lastPingSentNs是上一次ping的時間(nano time)
RequestHeader h = new RequestHeader(-2, OpCode.ping);//生成ping的特殊請求頭
queuePacket(h, null, null, null, null, null, null, null, null);//把packet加入到outgoingqueue中
}
這裡有一點需要注意:
if (p.requestHeader != null
&& p.requestHeader.getType() != OpCode.ping
&& p.requestHeader.getType() != OpCode.auth) {
synchronized (pendingQueue) {
pendingQueue.add(p);
}
}
這是ClientCnxnSocketNIO的doIO方法裡的一段,這裡想說明的是如果header是ping, auth或空,那麼在傳送完之後不會加入Pendingqueue中。
在sendthread的readresponse中,對ping和auth的請求都有特別的處理,在第八篇裡有分析過這個方法。
if (replyHdr.getXid() == -2) {
// -2 is the xid for pings
if (replyHdr.getXid() == -4) {
// -4 is the xid for AuthPacket
pingRwServer
這個方法是client連線了只讀的server時會嘗試連線hostprovider裡的讀寫server。
private void pingRwServer() throws RWServerFoundException {
String result = null;
InetSocketAddress addr = hostProvider.next(0);//下一個server地址
LOG.info("Checking server " + addr + " for being r/w." +
" Timeout " + pingRwTimeout);
Socket sock = null;
BufferedReader br = null;
try {
//socket變數初始化
sock = new Socket(addr.getHostName(), addr.getPort());
sock.setSoLinger(false, -1);
sock.setSoTimeout(1000);
sock.setTcpNoDelay(true);
sock.getOutputStream().write("isro".getBytes());
sock.getOutputStream().flush();
sock.shutdownOutput();
br = new BufferedReader(
new InputStreamReader(sock.getInputStream()));//獲得連線的回覆
result = br.readLine();//讀回覆
} catch (ConnectException e) {
// ignore, this just means server is not up
} catch (IOException e) {
// some unexpected error, warn about it
LOG.warn("Exception while seeking for r/w server " +
e.getMessage(), e);
} finally {
//保護語句
if (sock != null) {
try {
sock.close();
} catch (IOException e) {
LOG.warn("Unexpected exception", e);
}
}
if (br != null) {
try {
br.close();
} catch (IOException e) {
LOG.warn("Unexpected exception", e);
}
}
}
//如果發現是讀寫server
if ("rw".equals(result)) {
pingRwTimeout = minPingRwTimeout;
// save the found address so that it's used during the next
// connection attempt
rwServerAddress = addr;//儲存此server地址,更新rwServerAddress
throw new RWServerFoundException("Majority server found at "
+ addr.getHostName() + ":" + addr.getPort());//找到讀寫server,run方法拋異常,client重連到rwServerAddress
}
}
對異常的catch和處理部分如下:
else if (e instanceof RWServerFoundException) {
LOG.info(e.getMessage());
} else {
LOG.warn(
"Session 0x"
+ Long.toHexString(getSessionId())
+ " for server "
+ clientCnxnSocket.getRemoteSocketAddress()
+ ", unexpected error"
+ RETRY_CONN_MSG, e);
}
cleanup();
if (state.isAlive()) {
eventThread.queueEvent(new WatchedEvent(
Event.EventType.None,
Event.KeeperState.Disconnected,
null));//waitingevent佇列中加入斷開連線的event,會導致重連
}
可以看到這裡把一個斷開連線的event加入了佇列後,eventthread處理的時候就會重連,而重連的server就是先前設定好的rwServerAddress。
startConnect
從方法名很容易知道,這個方法是用來建立連線的。
private void startConnect() throws IOException {
// initializing it for new connection
//初始化變數
saslLoginFailed = false;
state = States.CONNECTING;
InetSocketAddress addr;//socket連結地址
if (rwServerAddress != null) {
addr = rwServerAddress;//設定為讀寫server的地址
rwServerAddress = null;//這裡設為空為了連線斷開的時候下次連線可以換一個server
} else {
addr = hostProvider.next(1000);//如果讀寫server地址為空就換hostProvider裡的下一個
}
setName(getName().replaceAll("\\(.*\\)",
"(" + addr.getHostName() + ":" + addr.getPort() + ")"));//設定執行緒的名字
if (ZooKeeperSaslClient.isEnabled()) {//sasl開啟了,sasl有時間再去仔細看看
try {
//相應的username和client的初始化
String principalUserName = System.getProperty(
ZK_SASL_CLIENT_USERNAME, "zookeeper");
zooKeeperSaslClient =
new ZooKeeperSaslClient(
principalUserName+"/"+addr.getHostName());
} catch (LoginException e) {
// An authentication error occurred when the SASL client tried to initialize:
// for Kerberos this means that the client failed to authenticate with the KDC.
// This is different from an authentication error that occurs during communication
// with the Zookeeper server, which is handled below.
LOG.warn("SASL configuration failed: " + e + " Will continue connection to Zookeeper server without "
+ "SASL authentication, if Zookeeper server allows it.");
eventThread.queueEvent(new WatchedEvent(
Watcher.Event.EventType.None,
Watcher.Event.KeeperState.AuthFailed, null));
saslLoginFailed = true;
}
}
logStartConnect(addr);//打log
clientCnxnSocket.connect(addr);//開始socket連線
}
總結可以看到主要有下面幾步:
- 相關變數初始化;
- 找到對應的server地址;
- sasl的處理及變數初始化、異常處理;
- 打log;
- 連線。
primeConnection
先簡單看下程式碼:
void primeConnection() throws IOException {
LOG.info("Socket connection established to "
+ clientCnxnSocket.getRemoteSocketAddress()
+ ", initiating session");
isFirstConnect = false;
//seenRwServerBefore會在第一次連線上R/Wserver時設定為true
long sessId = (seenRwServerBefore) ? sessionId : 0;//之前如果連線過rw的server會把sessionid設定成原來的,否則是0
ConnectRequest conReq = new ConnectRequest(0, lastZxid,
sessionTimeout, sessId, sessionPasswd);//構建connectRequest
synchronized (outgoingQueue) {
// We add backwards since we are pushing into the front
// Only send if there's a pending watch
// TODO: here we have the only remaining use of zooKeeper in
// this class. It's to be eliminated!
//重連rw server後,把所有watch的資訊,auth的資訊都放入outgoingqueue傳送給server同步
if (!disableAutoWatchReset) {//是否設定了自動重置watch的選項
List<String> dataWatches = zooKeeper.getDataWatches();
List<String> existWatches = zooKeeper.getExistWatches();
List<String> childWatches = zooKeeper.getChildWatches();
if (!dataWatches.isEmpty()
|| !existWatches.isEmpty() || !childWatches.isEmpty()) {
Iterator<String> dataWatchesIter = prependChroot(dataWatches).iterator();
Iterator<String> existWatchesIter = prependChroot(existWatches).iterator();
Iterator<String> childWatchesIter = prependChroot(childWatches).iterator();
long setWatchesLastZxid = lastZxid;
//遍歷watch集合
while (dataWatchesIter.hasNext()
|| existWatchesIter.hasNext() || childWatchesIter.hasNext()) {
List<String> dataWatchesBatch = new ArrayList<String>();
List<String> existWatchesBatch = new ArrayList<String>();
List<String> childWatchesBatch = new ArrayList<String>();
int batchLength = 0;
// Note, we may exceed our max length by a bit when we add the last
// watch in the batch. This isn't ideal, but it makes the code simpler.
//最長只能是128kB
while (batchLength < SET_WATCHES_MAX_LENGTH) {
final String watch;
if (dataWatchesIter.hasNext()) {
watch = dataWatchesIter.next();
dataWatchesBatch.add(watch);
} else if (existWatchesIter.hasNext()) {
watch = existWatchesIter.next();
existWatchesBatch.add(watch);
} else if (childWatchesIter.hasNext()) {
watch = childWatchesIter.next();
childWatchesBatch.add(watch);
} else {
break;
}
batchLength += watch.length();
}
//構件watchset
SetWatches sw = new SetWatches(setWatchesLastZxid,
dataWatchesBatch,
existWatchesBatch,
childWatchesBatch);
RequestHeader h = new RequestHeader();
h.setType(ZooDefs.OpCode.setWatches);//設定請求型別
h.setXid(-8);
Packet packet = new Packet(h, new ReplyHeader(), sw, null, null);
outgoingQueue.addFirst(packet);//加入outgoingqueue
}
}
}
//auth資訊加入outgoingqueue
for (AuthData id : authInfo) {
outgoingQueue.addFirst(new Packet(new RequestHeader(-4,
OpCode.auth), null, new AuthPacket(0, id.scheme,
id.data), null, null));
}
outgoingQueue.addFirst(new Packet(null, null, conReq,
null, null, readOnly));
}
//傳送(開始讀寫)
clientCnxnSocket.enableReadWriteOnly();
if (LOG.isDebugEnabled()) {
LOG.debug("Session establishment request sent on "
+ clientCnxnSocket.getRemoteSocketAddress());//打log
}
}
可以看到,primeConnection的功能主要就是重連rw server後同步watch和auth的資訊。主要有三步:1. 設定sessionid;2. 構建同步的資料並加入outgoingqueue;3. 開啟讀寫。
特別要強調的是:
outgoingQueue.addFirst(new Packet(null, null, conReq, null, null, readOnly));
在上面auth資料處理之後,會把帶有連線資料的packet放到佇列第一位。確保最先發出去的是連線請求(保證了第一個response是被ClientCnxnSocket#readConnectResult處理)。——這裡參考部落格,但我有點疑問,怎麼就保證了收到的第一個一定是這個連結的packet呢?
startConnect和primeConnection的關係(引用):
兩者的區別在於NIO的SelectionKey
前者限於connect和accept
後者已經連線完成,開始了write和read,準備開始和zk server完成socket io
有一篇部落格講大致建立過程比較好,引用下:
- 啟動SendThread
- 連線伺服器(見SendThread.startConnect)
- 產生真正的socket,見ClientCnxnSocketNIO.createSock
- 向select註冊一個OP_CONNECT事件並連線伺服器,由於是非阻塞連線,此時有可能並不會立即連上,如果連上就會呼叫SendThread.primeConnection初始化連線來註冊讀寫事件,否則會在接下來的輪詢select獲取連線事件中處理
- 復位socket的incomingBuffer
- 連線成功後會產生一個connect型的請求發給服務,用於獲取本次連線的sessionid
- 進入迴圈等待來自應用的請求,如果沒有就根據時間來ping 伺服器
為什麼要引用這個是因為比較能說明startconnect和primeconnection的區別,在第二步中呼叫了startconnect建立了連線後呼叫primeConnection,在startconnect中可以連線和接收訊息,在primeConnection()方法中主要初始化Session、Watch和許可權資訊,同時註冊ClientCnxnSocketNIO對讀時間和寫時間的監聽。
Onconnected
從註釋和函式名很容易看出是socket連線的callback。
/**
* Callback invoked by the ClientCnxnSocket once a connection has been
* established.//連線建立後的callback
*
* @param _negotiatedSessionTimeout
* @param _sessionId
* @param _sessionPasswd
* @param isRO
* @throws IOException
*/
void onConnected(int _negotiatedSessionTimeout, long _sessionId,
byte[] _sessionPasswd, boolean isRO) throws IOException {
negotiatedSessionTimeout = _negotiatedSessionTimeout;//為連線timeout賦值
if (negotiatedSessionTimeout <= 0) {//沒連上
state = States.CLOSED;//state->closed
eventThread.queueEvent(new WatchedEvent(
Watcher.Event.EventType.None,
Watcher.Event.KeeperState.Expired, null));
eventThread.queueEventOfDeath();//建立連線失效的event並把代表death的event加入waitingevent的等待佇列
String warnInfo;
warnInfo = "Unable to reconnect to ZooKeeper service, session 0x"
+ Long.toHexString(sessionId) + " has expired";
LOG.warn(warnInfo);
throw new SessionExpiredException(warnInfo);//打log,拋異常
}
if (!readOnly && isRO) {
LOG.error("Read/write client got connected to read-only server");
}
readTimeout = negotiatedSessionTimeout * 2 / 3; //read的timeout為啥設定成真實timeout的2/3,
connectTimeout = negotiatedSessionTimeout / hostProvider.size();//均分timeout時間
hostProvider.onConnected();//更改hostprovider裡的狀態
sessionId = _sessionId;
sessionPasswd = _sessionPasswd;
state = (isRO) ?
States.CONNECTEDREADONLY : States.CONNECTED;//設定連線狀態和session資訊
seenRwServerBefore |= !isRO;
LOG.info("Session establishment complete on server "
+ clientCnxnSocket.getRemoteSocketAddress()
+ ", sessionid = 0x" + Long.toHexString(sessionId)
+ ", negotiated timeout = " + negotiatedSessionTimeout
+ (isRO ? " (READ-ONLY mode)" : ""));//打log
KeeperState eventState = (isRO) ?
KeeperState.ConnectedReadOnly : KeeperState.SyncConnected;//是否是readonly的連線
eventThread.queueEvent(new WatchedEvent(
Watcher.Event.EventType.None,
eventState, null));//吧連線成功的event加入佇列
}
readResponse
這個方法比較長,一段一段來分析。
ByteBufferInputStream bbis = new ByteBufferInputStream(
incomingBuffer);
BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);//生成反序列化的archive
ReplyHeader replyHdr = new ReplyHeader();
replyHdr.deserialize(bbia, "header");//解析出header
解析出回覆頭後開始處理邏輯。
if (replyHdr.getXid() == -2) {//如果是ping的回覆
// -2 is the xid for pings
if (LOG.isDebugEnabled()) {
LOG.debug("Got ping response for sessionid: 0x"
+ Long.toHexString(sessionId)
+ " after "
+ ((System.nanoTime() - lastPingSentNs) / 1000000)
+ "ms");//打log
}
return;
}
if (replyHdr.getXid() == -4) {//如果是auth的回覆
// -4 is the xid for AuthPacket
if(replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) {//如果驗證失敗
state = States.AUTH_FAILED;//狀態設定
eventThread.queueEvent( new WatchedEvent(Watcher.Event.EventType.None,
Watcher.Event.KeeperState.AuthFailed, null) );//把驗證失敗加入waitingEvents佇列
}
if (LOG.isDebugEnabled()) {
LOG.debug("Got auth sessionid:0x"
+ Long.toHexString(sessionId));//打log
}
return;
}
if (replyHdr.getXid() == -1) {//如果是通知
// -1 means notification
if (LOG.isDebugEnabled()) {
LOG.debug("Got notification sessionid:0x"
+ Long.toHexString(sessionId));//打log
}
WatcherEvent event = new WatcherEvent();
event.deserialize(bbia, "response");
// convert from a server path to a client path
//把server端的path轉換成client端的path
if (chrootPath != null) {
String serverPath = event.getPath();
if(serverPath.compareTo(chrootPath)==0)
event.setPath("/");//把server端地址為chrootPath作為根節點
else if (serverPath.length() > chrootPath.length())
event.setPath(serverPath.substring(chrootPath.length()));//獲取地址
else {
LOG.warn("Got server path " + event.getPath()
+ " which is too short for chroot path "
+ chrootPath);//server端地址比chrootPath.length()不正常,打log
}
}
WatchedEvent we = new WatchedEvent(event);//WatcherEvent生成WatchedEvent
if (LOG.isDebugEnabled()) {
LOG.debug("Got " + we + " for sessionid 0x"
+ Long.toHexString(sessionId));
}
eventThread.queueEvent( we );//加入waitingEvents佇列
return;
}
// If SASL authentication is currently in progress, construct and
// send a response packet immediately, rather than queuing a
// response as with other packets.
//sasl驗證失敗或者驗證在進行中就傳送一個請求(不排隊)
if (clientTunneledAuthenticationInProgress()) {
GetSASLRequest request = new GetSASLRequest();
request.deserialize(bbia,"token");
zooKeeperSaslClient.respondToServer(request.getToken(),
ClientCnxn.this);
return;
}
Packet packet;
synchronized (pendingQueue) {//處理pendingqueue
//從前面程式碼的分析可知,auth和ping以及正在處理的sasl不在pendingqueue中(不會走到這一步),而觸發的watch也不pendingqueue中,是server發過來去watchmanager裡去找的,但是非同步的AsyncCallBack在
if (pendingQueue.size() == 0) {
throw new IOException("Nothing in the queue, but got "
+ replyHdr.getXid());//沒有元素
}
packet = pendingQueue.remove();//獲取元素
}
/*
* Since requests are processed in order, we better get a response
* to the first request!
*/
try {
if (packet.requestHeader.getXid() != replyHdr.getXid()) {//順序錯誤
packet.replyHeader.setErr(
KeeperException.Code.CONNECTIONLOSS.intValue());
throw new IOException("Xid out of order. Got Xid "
+ replyHdr.getXid() + " with err " +
+ replyHdr.getErr() +
" expected Xid "
+ packet.requestHeader.getXid()
+ " for a packet with details: "
+ packet );
}
//屬性設定
packet.replyHeader.setXid(replyHdr.getXid());
packet.replyHeader.setErr(replyHdr.getErr());
packet.replyHeader.setZxid(replyHdr.getZxid());
if (replyHdr.getZxid() > 0) {
lastZxid = replyHdr.getZxid();
}
if (packet.response != null && replyHdr.getErr() == 0) {
packet.response.deserialize(bbia, "response");//反序列化response
}
if (LOG.isDebugEnabled()) {
LOG.debug("Reading reply sessionid:0x"
+ Long.toHexString(sessionId) + ", packet:: " + packet);//打log
}
} finally {
finishPacket(packet);//加入waitingevents佇列,之前watcher工作機制時講到watcher註冊後會把packet加入佇列中
}
總結:
- 反序列化response;
- 根據回覆頭來處理,如果是ping,auth和sasl直接處理後返回,不會加入waitingevent佇列;
- 如果是server的通知表示是event,加入佇列
- 處理pendingqueue裡已經發送的packet。
run
run方法是sendthread乃至建立連線最核心的方法,內容也比較長,我們一節一節來看。
@Override
public void run() {
clientCnxnSocket.introduce(this,sessionId);
clientCnxnSocket.updateNow();
clientCnxnSocket.updateLastSendAndHeard();
int to;
long lastPingRwServer = Time.currentElapsedTime();
final int MAX_SEND_PING_INTERVAL = 10000; //10 seconds
很明顯,這一段最開始的程式碼就是clientCnxnSocket相關的初始化工作。
while (state.isAlive()) {
try {
if (!clientCnxnSocket.isConnected()) {//未連線
if(!isFirstConnect){
try {
Thread.sleep(r.nextInt(1000));
} catch (InterruptedException e) {
LOG.warn("Unexpected exception", e);
}
}
// don't re-establish connection if we are closing
if (closing || !state.isAlive()) {//這裡最後有用到
break;
}
startConnect();//連線
clientCnxnSocket.updateLastSendAndHeard();
}
然後開始檢查和伺服器的連線狀態,如果沒有連線就會呼叫startConnect()去連線server;如果已經連線上了那麼就會定期傳送ping來維持心跳檢測。
if (state.isConnected()) {
// determine whether we need to send an AuthFailed event.
if (zooKeeperSaslClient != null) {
boolean sendAuthEvent = false;
if (zooKeeperSaslClient.getSaslState() == ZooKeeperSaslClient.SaslState.INITIAL) {//sasl狀態
try {
zooKeeperSaslClient.initialize(ClientCnxn.this);//sasl初始化,後面再研究
} catch (SaslException e) {
LOG.error("SASL authentication with Zookeeper Quorum member failed: " + e);
state = States.AUTH_FAILED;
sendAuthEvent = true;
}
}
KeeperState authState = zooKeeperSaslClient.getKeeperState();//連線狀態
if (authState != null) {
if (authState == KeeperState.AuthFailed) {//驗證失敗
// An authentication error occurred during authentication with the Zookeeper Server.
state = States.AUTH_FAILED;
sendAuthEvent = true;
} else {
if (authState == KeeperState.SaslAuthenticated) {
sendAuthEvent = true;
}
}
}
if (sendAuthEvent == true) {
eventThread.queueEvent(new WatchedEvent(
Watcher.Event.EventType.None,
authState,null));
}
}
//readTimeout = 2/3的sessiontimeout
to = readTimeout - clientCnxnSocket.getIdleRecv();//如果已經連線上,預計讀超時時間 - 距離上次讀已經過去的時間
} else {
to = connectTimeout - clientCnxnSocket.getIdleRecv();//如果沒連線上,預計連線時間 - 上次讀已經過去的時間,這兩次獲得的就是是否超過了預計的timeout時間
}
if (to <= 0) {//超時
String warnInfo;
warnInfo = "Client session timed out, have not heard from server in "
+ clientCnxnSocket.getIdleRecv()
+ "ms"
+ " for sessionid 0x"
+ Long.toHexString(sessionId);
LOG.warn(warnInfo);
throw new SessionTimeoutException(warnInfo);//打log,拋異常
}
if (state.isConnected()) {//如果是連線狀態
//1000(1 second) is to prevent race condition missing to send the second ping
//also make sure not to send too many pings when readTimeout is small
//計算到下次ping的時間,這裡做了優化,如果設定的時間過小會有個調節機制
int timeToNextPing = readTimeout / 2 - clientCnxnSocket.getIdleSend() -
((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0);
//send a ping request either time is due or no packet sent out within MAX_SEND_PING_INTERVAL
if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) {//如果已經過了一半的readtimeout時間或者超過十秒沒有ping
sendPing();//傳送ping
clientCnxnSocket.updateLastSend();
} else {
if (timeToNextPing < to) {//如果預計下次ping的時間 < 實際距離下次ping的時間
to = timeToNextPing;
}
}
}
接下來是根據連線到的server的狀態來決策,如果是隻讀的server,會自動去尋找讀寫的server。
// If we are in read-only mode, seek for read/write server
if (state == States.CONNECTEDREADONLY) {//連線是CONNECTEDREADONLY,那麼連線到的server就是read-only的
long now = Time.currentElapsedTime();
int idlePingRwServer = (int) (now - lastPingRwServer);//離上次ping讀寫server的時間
if (idlePingRwServer >= pingRwTimeout) {
lastPingRwServer = now;//更新連線讀寫server的時間
idlePingRwServer = 0;
pingRwTimeout =
Math.min(2*pingRwTimeout, maxPingRwTimeout);
pingRwServer();//嘗試去連線讀寫server
}
to = Math.min(to, pingRwTimeout - idlePingRwServer);
}
然後會把傳送outgoingqueue中的請求資料並讀取server的回覆。
clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this);
最後是一些清理工作和對連線斷開的處理。這裡已經跳出了上面的迴圈,有幾個地方需要注意:
cleanup();//連線和幾個queue的清理
clientCnxnSocket.close();//關閉連線
if (state.isAlive()) {//1️⃣
eventThread.queueEvent(new WatchedEvent(Event.EventType.None,
Event.KeeperState.Disconnected, null));//加入一個斷開連線的event
}
ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(),
"SendThread exited loop for session: 0x"
+ Long.toHexString(getSessionId()));//打log
1️⃣上面迴圈的條件就是while(state.isAlive()),之所以跳出了迴圈這裡還要判斷state狀態的原因是
// don't re-establish connection if we are closing
if (closing || !state.isAlive()) {
break;
}
在run方法最開始的程式碼中會去判斷closing的狀態,closing是在client主動傳送斷開連線的訊息後closing才會設定成為false,而run方法中的while迴圈跳出來且state是alive的狀態只有可能是client端主動傳送了斷開連線的訊息,這時就給eventthread增加一個斷開連線的事件去處理。
總結,run方法主要做了下面幾個工作:
- clientCnxnSocket及相關引數的初始化;
- 如果client端沒有連線上server就會去嘗試連線;
- 無論是否連線上會去檢測連線是否超時;
- 如果已經連線上了那麼會定期去傳送心跳檢測和server的連線狀態;
- 如果連線到了readonly的server,會主動去連線讀寫的server;
- 傳送outgoingqueue裡的請求並接受server的回覆;
- 包含連線斷開的相關清理工作。
到這裡建立階段基本就結束了,感覺這個過程主要的流程和一些處理大致明白了,但是過程中有非常多的細節,這可能需要以後如果有用到的地方再仔細看看。
思考
primeConnection關於request和response的順序問題
如前面所說,怎麼保證順序
RW和readonly模式
server的這兩種模式各自條件是?
session的工作機制
sessId, sessionId
其實sessId就是sessionId,seenRwServerBefore在第一次連線時會被設定為true,sessId在未連線時為0,第一次建立連線時構建的ConnectRequest中會設定sessionId為0。
不在pendingqueue裡的請求
auth和ping以及正在處理的sasl沒有加入pendingQueue,觸發的watch也沒有在pendingQueue中。根據上一篇的參考第一篇中notification event的介紹可以知道觸發的watch是server的主動通知,不會存在pendingqueue中。針對auth和ping的處理,在第八篇裡當時對replyHdr的xid不是很清楚,當時思考裡也提了這個問題,現在可以知道是針對auth和ping的。
sendping和pingRwServer
前者是心跳驗證,後者是連線到readonly的server後嘗試連線讀寫server。
大致連線過程
首先與ZooKeeper伺服器建立連線,有兩層連線要建立。
客戶端與伺服器端的TCP連線
- 在TCP連線的基礎上建立session關聯 建立TCP連線之後,客戶端傳送ConnectRequest請求,申請建立session關聯,此時伺服器端會為該客戶端分配sessionId和密碼,同時開啟對該session是否超時的檢測。
- 當在sessionTimeout時間內,即還未超時,此時TCP連線斷開,伺服器端仍然認為該sessionId處於存活狀態。 此時,客戶端會選擇下一個ZooKeeper伺服器地址進行TCP連線建立,TCP連線建立完成後,拿著之前的sessionId和密碼傳送ConnectRequest請求,如果還未到該sessionId的超時時間,則表示自動重連成功。 對客戶端使用者是透明的,一切都在背後默默執行,ZooKeeper物件是有效的。
如果重新建立TCP連線後,已經達到該sessionId的超時時間了(伺服器端就會清理與該sessionId相關的資料),則返回給客戶端的sessionTimeout時間為0,sessionid為0,密碼為空位元組陣列。 客戶端接收到該資料後,會判斷協商後的sessionTimeout時間是否小於等於0,如果小於等於0,則使用eventThread執行緒先發出一個KeeperState.Expired事件,通知相應的Watcher。 然後結束EventThread執行緒的迴圈,開始走向結束。此時ZooKeeper物件就是無效的了,必須要重新new一個新的ZooKeeper物件,分配新的sessionId了。
參考
https://www.jianshu.com/p/f69e6de5f169
http://www.cnblogs.com/leesf456/p/6098255.html
https://my.oschina.net/pingpangkuangmo/blog/486780
https://blog.csdn.net/cnh294141800/article/details/53039482
https://www.cnblogs.com/shangxiaofei/p/7171882.html
《從Paxos到Zookeeper》