Zookeeper源碼閱讀(九) ZK Client-Server(2)
前言
前面一篇博客主要從大致流程的角度說了下client和server建立連接的流程,這篇和下一篇博客會詳細的把上一篇不是很細致的地方展開和補充。
初始化階段
初始化階段主要就是把Zookeeper類中比較重要的功能類實例化,前面對這個過程說的已經比較詳細了。這裏主要補充幾點:
ClientCnxn初始化
cnxn = new ClientCnxn(connectStringParser.getChrootPath(), hostProvider, sessionTimeout, this, watchManager, getClientCnxnSocket(), canBeReadOnly);
public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,
ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,
long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) {
可以看到ClientCnxn的構造器中有一個非常重要的參數是ClientCnxnSocket,這也是client和server建立連接的功能類,這裏看下如何獲得的。
private static ClientCnxnSocket getClientCnxnSocket() throws IOException { //獲取系統配置 String clientCnxnSocketName = System .getProperty(ZOOKEEPER_CLIENT_CNXN_SOCKET); //如果沒有特別設置,那麽采用NIO的實現為默認實現 if (clientCnxnSocketName == null) { clientCnxnSocketName = ClientCnxnSocketNIO.class.getName(); } try { //反射來獲取對象實例 return (ClientCnxnSocket) Class.forName(clientCnxnSocketName).getDeclaredConstructor() .newInstance(); } catch (Exception e) { IOException ioe = new IOException("Couldn't instantiate " + clientCnxnSocketName); ioe.initCause(e); throw ioe; } }
可以看到,clientCnxn的對象是通過反射獲得的。
如果還記得的話,之前watcher發送的時候就是clientCnxn來發送的,當然,其實client端和server的連接都是通過這個類來做的,具體的方法涉及到doTransport, doIO等,具體使用了NIO的一些方法,之後把NIO和Netty弄得比較清楚後再來把這部分補上。
StaticHostProvider裏的等待
public InetSocketAddress next(long spinDelay) {
//每嘗試一次currentindex加一
++currentIndex;
//試了所有的server
if (currentIndex == serverAddresses.size()) {
currentIndex = 0;
}
//試了一圈就會有currentIndex == lastIndex
if (currentIndex == lastIndex && spinDelay > 0) {
try {
Thread.sleep(spinDelay);
} catch (InterruptedException e) {
LOG.warn("Unexpected exception", e);
}
} else if (lastIndex == -1) {
// We don't want to sleep on the first ever connect attempt.
lastIndex = 0;
}
return serverAddresses.get(currentIndex);
}
剛開始看到這個方法的時候很奇怪,一般next方法基本都沒有參數的,這裏很奇怪的加了個delay的時間,仔細看了看才發現有特殊的考慮,如果所有server都試過了且連不上,就會sleep spinDelay時間再嘗試。
創建階段
如前篇博客所說,當zookeeper構造器中clientCnxn的start方法調用也就是sendthread和eventthread開始run起來時,創建過程便開始了。
public void start() {
sendThread.start();
eventThread.start();
}
創建連接的過程主要是sendthread做的,啟動了start()方法實際上就是調用了sendthread的run方法,但是這個方法比較復雜,我們從sendThread的別的方法開始,最後再過run方法這樣清晰一點。而sendthread的功能之前有說過,這裏引用一份別人的總結,下面看方法功能的時候也可以對照:
- 維護了客戶端與服務端之間的會話生命周期(通過一定周期頻率內向服務端發送PING包檢測心跳),如果會話周期內客戶端與服務端出現TCP連接斷開,那麽就會自動且透明地完成重連操作。
- 管理了客戶端所有的請求發送和響應接收操作,其將上層客戶端API操作轉換成相應的請求協議並發送到服務端,並完成對同步調用的返回和異步調用的回調。
- 將來自服務端的事件傳遞給EventThread去處理。
下面過一下幾個比較重要的方法:
sendPing
這是sendthread功能第一點的方法,保證了和server之間的ping連接,也就是心跳。
private void sendPing() {
lastPingSentNs = System.nanoTime();//lastPingSentNs是上一次ping的時間(nano time)
RequestHeader h = new RequestHeader(-2, OpCode.ping);//生成ping的特殊請求頭
queuePacket(h, null, null, null, null, null, null, null, null);//把packet加入到outgoingqueue中
}
這裏有一點需要註意:
if (p.requestHeader != null
&& p.requestHeader.getType() != OpCode.ping
&& p.requestHeader.getType() != OpCode.auth) {
synchronized (pendingQueue) {
pendingQueue.add(p);
}
}
這是ClientCnxnSocketNIO的doIO方法裏的一段,這裏想說明的是如果header是ping, auth或空,那麽在發送完之後不會加入Pendingqueue中。
在sendthread的readresponse中,對ping和auth的請求都有特別的處理,在第八篇裏有分析過這個方法。
if (replyHdr.getXid() == -2) {
// -2 is the xid for pings
if (replyHdr.getXid() == -4) {
// -4 is the xid for AuthPacket
pingRwServer
這個方法是client連接了只讀的server時會嘗試連接hostprovider裏的讀寫server。
private void pingRwServer() throws RWServerFoundException {
String result = null;
InetSocketAddress addr = hostProvider.next(0);//下一個server地址
LOG.info("Checking server " + addr + " for being r/w." +
" Timeout " + pingRwTimeout);
Socket sock = null;
BufferedReader br = null;
try {
//socket變量初始化
sock = new Socket(addr.getHostName(), addr.getPort());
sock.setSoLinger(false, -1);
sock.setSoTimeout(1000);
sock.setTcpNoDelay(true);
sock.getOutputStream().write("isro".getBytes());
sock.getOutputStream().flush();
sock.shutdownOutput();
br = new BufferedReader(
new InputStreamReader(sock.getInputStream()));//獲得連接的回復
result = br.readLine();//讀回復
} catch (ConnectException e) {
// ignore, this just means server is not up
} catch (IOException e) {
// some unexpected error, warn about it
LOG.warn("Exception while seeking for r/w server " +
e.getMessage(), e);
} finally {
//保護語句
if (sock != null) {
try {
sock.close();
} catch (IOException e) {
LOG.warn("Unexpected exception", e);
}
}
if (br != null) {
try {
br.close();
} catch (IOException e) {
LOG.warn("Unexpected exception", e);
}
}
}
//如果發現是讀寫server
if ("rw".equals(result)) {
pingRwTimeout = minPingRwTimeout;
// save the found address so that it's used during the next
// connection attempt
rwServerAddress = addr;//保存此server地址,更新rwServerAddress
throw new RWServerFoundException("Majority server found at "
+ addr.getHostName() + ":" + addr.getPort());//找到讀寫server,run方法拋異常,client重連到rwServerAddress
}
}
對異常的catch和處理部分如下:
else if (e instanceof RWServerFoundException) {
LOG.info(e.getMessage());
} else {
LOG.warn(
"Session 0x"
+ Long.toHexString(getSessionId())
+ " for server "
+ clientCnxnSocket.getRemoteSocketAddress()
+ ", unexpected error"
+ RETRY_CONN_MSG, e);
}
cleanup();
if (state.isAlive()) {
eventThread.queueEvent(new WatchedEvent(
Event.EventType.None,
Event.KeeperState.Disconnected,
null));//waitingevent隊列中加入斷開連接的event,會導致重連
}
可以看到這裏把一個斷開連接的event加入了隊列後,eventthread處理的時候就會重連,而重連的server就是先前設置好的rwServerAddress。
startConnect
從方法名很容易知道,這個方法是用來建立連接的。
private void startConnect() throws IOException {
// initializing it for new connection
//初始化變量
saslLoginFailed = false;
state = States.CONNECTING;
InetSocketAddress addr;//socket鏈接地址
if (rwServerAddress != null) {
addr = rwServerAddress;//設置為讀寫server的地址
rwServerAddress = null;//這裏設為空為了連接斷開的時候下次連接可以換一個server
} else {
addr = hostProvider.next(1000);//如果讀寫server地址為空就換hostProvider裏的下一個
}
setName(getName().replaceAll("\\(.*\\)",
"(" + addr.getHostName() + ":" + addr.getPort() + ")"));//設置線程的名字
if (ZooKeeperSaslClient.isEnabled()) {//sasl開啟了,sasl有時間再去仔細看看
try {
//相應的username和client的初始化
String principalUserName = System.getProperty(
ZK_SASL_CLIENT_USERNAME, "zookeeper");
zooKeeperSaslClient =
new ZooKeeperSaslClient(
principalUserName+"/"+addr.getHostName());
} catch (LoginException e) {
// An authentication error occurred when the SASL client tried to initialize:
// for Kerberos this means that the client failed to authenticate with the KDC.
// This is different from an authentication error that occurs during communication
// with the Zookeeper server, which is handled below.
LOG.warn("SASL configuration failed: " + e + " Will continue connection to Zookeeper server without "
+ "SASL authentication, if Zookeeper server allows it.");
eventThread.queueEvent(new WatchedEvent(
Watcher.Event.EventType.None,
Watcher.Event.KeeperState.AuthFailed, null));
saslLoginFailed = true;
}
}
logStartConnect(addr);//打log
clientCnxnSocket.connect(addr);//開始socket連接
}
總結可以看到主要有下面幾步:
- 相關變量初始化;
- 找到對應的server地址;
- sasl的處理及變量初始化、異常處理;
- 打log;
- 連接。
primeConnection
先簡單看下代碼:
void primeConnection() throws IOException {
LOG.info("Socket connection established to "
+ clientCnxnSocket.getRemoteSocketAddress()
+ ", initiating session");
isFirstConnect = false;
//seenRwServerBefore會在第一次連接上R/Wserver時設置為true
long sessId = (seenRwServerBefore) ? sessionId : 0;//之前如果連接過rw的server會把sessionid設置成原來的,否則是0
ConnectRequest conReq = new ConnectRequest(0, lastZxid,
sessionTimeout, sessId, sessionPasswd);//構建connectRequest
synchronized (outgoingQueue) {
// We add backwards since we are pushing into the front
// Only send if there's a pending watch
// TODO: here we have the only remaining use of zooKeeper in
// this class. It's to be eliminated!
//重連rw server後,把所有watch的信息,auth的信息都放入outgoingqueue發送給server同步
if (!disableAutoWatchReset) {//是否設置了自動重置watch的選項
List<String> dataWatches = zooKeeper.getDataWatches();
List<String> existWatches = zooKeeper.getExistWatches();
List<String> childWatches = zooKeeper.getChildWatches();
if (!dataWatches.isEmpty()
|| !existWatches.isEmpty() || !childWatches.isEmpty()) {
Iterator<String> dataWatchesIter = prependChroot(dataWatches).iterator();
Iterator<String> existWatchesIter = prependChroot(existWatches).iterator();
Iterator<String> childWatchesIter = prependChroot(childWatches).iterator();
long setWatchesLastZxid = lastZxid;
//遍歷watch集合
while (dataWatchesIter.hasNext()
|| existWatchesIter.hasNext() || childWatchesIter.hasNext()) {
List<String> dataWatchesBatch = new ArrayList<String>();
List<String> existWatchesBatch = new ArrayList<String>();
List<String> childWatchesBatch = new ArrayList<String>();
int batchLength = 0;
// Note, we may exceed our max length by a bit when we add the last
// watch in the batch. This isn't ideal, but it makes the code simpler.
//最長只能是128kB
while (batchLength < SET_WATCHES_MAX_LENGTH) {
final String watch;
if (dataWatchesIter.hasNext()) {
watch = dataWatchesIter.next();
dataWatchesBatch.add(watch);
} else if (existWatchesIter.hasNext()) {
watch = existWatchesIter.next();
existWatchesBatch.add(watch);
} else if (childWatchesIter.hasNext()) {
watch = childWatchesIter.next();
childWatchesBatch.add(watch);
} else {
break;
}
batchLength += watch.length();
}
//構件watchset
SetWatches sw = new SetWatches(setWatchesLastZxid,
dataWatchesBatch,
existWatchesBatch,
childWatchesBatch);
RequestHeader h = new RequestHeader();
h.setType(ZooDefs.OpCode.setWatches);//設置請求類型
h.setXid(-8);
Packet packet = new Packet(h, new ReplyHeader(), sw, null, null);
outgoingQueue.addFirst(packet);//加入outgoingqueue
}
}
}
//auth信息加入outgoingqueue
for (AuthData id : authInfo) {
outgoingQueue.addFirst(new Packet(new RequestHeader(-4,
OpCode.auth), null, new AuthPacket(0, id.scheme,
id.data), null, null));
}
outgoingQueue.addFirst(new Packet(null, null, conReq,
null, null, readOnly));
}
//發送(開始讀寫)
clientCnxnSocket.enableReadWriteOnly();
if (LOG.isDebugEnabled()) {
LOG.debug("Session establishment request sent on "
+ clientCnxnSocket.getRemoteSocketAddress());//打log
}
}
可以看到,primeConnection的功能主要就是重連rw server後同步watch和auth的信息。主要有三步:1. 設置sessionid;2. 構建同步的數據並加入outgoingqueue;3. 開啟讀寫。
特別要強調的是:
outgoingQueue.addFirst(new Packet(null, null, conReq, null, null, readOnly));
在上面auth數據處理之後,會把帶有連接數據的packet放到隊列第一位。確保最先發出去的是連接請求(保證了第一個response是被ClientCnxnSocket#readConnectResult處理)。——這裏參考博客,但我有點疑問,怎麽就保證了收到的第一個一定是這個鏈接的packet呢?
startConnect和primeConnection的關系(引用):
兩者的區別在於NIO的SelectionKey
前者限於connect和accept
後者已經連接完成,開始了write和read,準備開始和zk server完成socket io
有一篇博客講大致創建過程比較好,引用下:
- 啟動SendThread
- 連接服務器(見SendThread.startConnect)
- 產生真正的socket,見ClientCnxnSocketNIO.createSock
- 向select註冊一個OP_CONNECT事件並連接服務器,由於是非阻塞連接,此時有可能並不會立即連上,如果連上就會調用SendThread.primeConnection初始化連接來註冊讀寫事件,否則會在接下來的輪詢select獲取連接事件中處理
- 復位socket的incomingBuffer
- 連接成功後會產生一個connect型的請求發給服務,用於獲取本次連接的sessionid
- 進入循環等待來自應用的請求,如果沒有就根據時間來ping 服務器
為什麽要引用這個是因為比較能說明startconnect和primeconnection的區別,在第二步中調用了startconnect建立了連接後調用primeConnection,在startconnect中可以連接和接收消息,在primeConnection()方法中主要初始化Session、Watch和權限信息,同時註冊ClientCnxnSocketNIO對讀時間和寫時間的監聽。
Onconnected
從註釋和函數名很容易看出是socket連接的callback。
/**
* Callback invoked by the ClientCnxnSocket once a connection has been
* established.//連接建立後的callback
*
* @param _negotiatedSessionTimeout
* @param _sessionId
* @param _sessionPasswd
* @param isRO
* @throws IOException
*/
void onConnected(int _negotiatedSessionTimeout, long _sessionId,
byte[] _sessionPasswd, boolean isRO) throws IOException {
negotiatedSessionTimeout = _negotiatedSessionTimeout;//為連接timeout賦值
if (negotiatedSessionTimeout <= 0) {//沒連上
state = States.CLOSED;//state->closed
eventThread.queueEvent(new WatchedEvent(
Watcher.Event.EventType.None,
Watcher.Event.KeeperState.Expired, null));
eventThread.queueEventOfDeath();//建立連接失效的event並把代表death的event加入waitingevent的等待隊列
String warnInfo;
warnInfo = "Unable to reconnect to ZooKeeper service, session 0x"
+ Long.toHexString(sessionId) + " has expired";
LOG.warn(warnInfo);
throw new SessionExpiredException(warnInfo);//打log,拋異常
}
if (!readOnly && isRO) {
LOG.error("Read/write client got connected to read-only server");
}
readTimeout = negotiatedSessionTimeout * 2 / 3; //read的timeout為啥設置成真實timeout的2/3,
connectTimeout = negotiatedSessionTimeout / hostProvider.size();//均分timeout時間
hostProvider.onConnected();//更改hostprovider裏的狀態
sessionId = _sessionId;
sessionPasswd = _sessionPasswd;
state = (isRO) ?
States.CONNECTEDREADONLY : States.CONNECTED;//設置連接狀態和session信息
seenRwServerBefore |= !isRO;
LOG.info("Session establishment complete on server "
+ clientCnxnSocket.getRemoteSocketAddress()
+ ", sessionid = 0x" + Long.toHexString(sessionId)
+ ", negotiated timeout = " + negotiatedSessionTimeout
+ (isRO ? " (READ-ONLY mode)" : ""));//打log
KeeperState eventState = (isRO) ?
KeeperState.ConnectedReadOnly : KeeperState.SyncConnected;//是否是readonly的連接
eventThread.queueEvent(new WatchedEvent(
Watcher.Event.EventType.None,
eventState, null));//吧連接成功的event加入隊列
}
readResponse
這個方法比較長,一段一段來分析。
ByteBufferInputStream bbis = new ByteBufferInputStream(
incomingBuffer);
BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);//生成反序列化的archive
ReplyHeader replyHdr = new ReplyHeader();
replyHdr.deserialize(bbia, "header");//解析出header
解析出回復頭後開始處理邏輯。
if (replyHdr.getXid() == -2) {//如果是ping的回復
// -2 is the xid for pings
if (LOG.isDebugEnabled()) {
LOG.debug("Got ping response for sessionid: 0x"
+ Long.toHexString(sessionId)
+ " after "
+ ((System.nanoTime() - lastPingSentNs) / 1000000)
+ "ms");//打log
}
return;
}
if (replyHdr.getXid() == -4) {//如果是auth的回復
// -4 is the xid for AuthPacket
if(replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) {//如果驗證失敗
state = States.AUTH_FAILED;//狀態設置
eventThread.queueEvent( new WatchedEvent(Watcher.Event.EventType.None,
Watcher.Event.KeeperState.AuthFailed, null) );//把驗證失敗加入waitingEvents隊列
}
if (LOG.isDebugEnabled()) {
LOG.debug("Got auth sessionid:0x"
+ Long.toHexString(sessionId));//打log
}
return;
}
if (replyHdr.getXid() == -1) {//如果是通知
// -1 means notification
if (LOG.isDebugEnabled()) {
LOG.debug("Got notification sessionid:0x"
+ Long.toHexString(sessionId));//打log
}
WatcherEvent event = new WatcherEvent();
event.deserialize(bbia, "response");
// convert from a server path to a client path
//把server端的path轉換成client端的path
if (chrootPath != null) {
String serverPath = event.getPath();
if(serverPath.compareTo(chrootPath)==0)
event.setPath("/");//把server端地址為chrootPath作為根節點
else if (serverPath.length() > chrootPath.length())
event.setPath(serverPath.substring(chrootPath.length()));//獲取地址
else {
LOG.warn("Got server path " + event.getPath()
+ " which is too short for chroot path "
+ chrootPath);//server端地址比chrootPath.length()不正常,打log
}
}
WatchedEvent we = new WatchedEvent(event);//WatcherEvent生成WatchedEvent
if (LOG.isDebugEnabled()) {
LOG.debug("Got " + we + " for sessionid 0x"
+ Long.toHexString(sessionId));
}
eventThread.queueEvent( we );//加入waitingEvents隊列
return;
}
// If SASL authentication is currently in progress, construct and
// send a response packet immediately, rather than queuing a
// response as with other packets.
//sasl驗證失敗或者驗證在進行中就發送一個請求(不排隊)
if (clientTunneledAuthenticationInProgress()) {
GetSASLRequest request = new GetSASLRequest();
request.deserialize(bbia,"token");
zooKeeperSaslClient.respondToServer(request.getToken(),
ClientCnxn.this);
return;
}
Packet packet;
synchronized (pendingQueue) {//處理pendingqueue
//從前面代碼的分析可知,auth和ping以及正在處理的sasl不在pendingqueue中(不會走到這一步),而觸發的watch也不pendingqueue中,是server發過來去watchmanager裏去找的,但是異步的AsyncCallBack在
if (pendingQueue.size() == 0) {
throw new IOException("Nothing in the queue, but got "
+ replyHdr.getXid());//沒有元素
}
packet = pendingQueue.remove();//獲取元素
}
/*
* Since requests are processed in order, we better get a response
* to the first request!
*/
try {
if (packet.requestHeader.getXid() != replyHdr.getXid()) {//順序錯誤
packet.replyHeader.setErr(
KeeperException.Code.CONNECTIONLOSS.intValue());
throw new IOException("Xid out of order. Got Xid "
+ replyHdr.getXid() + " with err " +
+ replyHdr.getErr() +
" expected Xid "
+ packet.requestHeader.getXid()
+ " for a packet with details: "
+ packet );
}
//屬性設置
packet.replyHeader.setXid(replyHdr.getXid());
packet.replyHeader.setErr(replyHdr.getErr());
packet.replyHeader.setZxid(replyHdr.getZxid());
if (replyHdr.getZxid() > 0) {
lastZxid = replyHdr.getZxid();
}
if (packet.response != null && replyHdr.getErr() == 0) {
packet.response.deserialize(bbia, "response");//反序列化response
}
if (LOG.isDebugEnabled()) {
LOG.debug("Reading reply sessionid:0x"
+ Long.toHexString(sessionId) + ", packet:: " + packet);//打log
}
} finally {
finishPacket(packet);//加入waitingevents隊列,之前watcher工作機制時講到watcher註冊後會把packet加入隊列中
}
總結:
- 反序列化response;
- 根據回復頭來處理,如果是ping,auth和sasl直接處理後返回,不會加入waitingevent隊列;
- 如果是server的通知表示是event,加入隊列
- 處理pendingqueue裏已經發送的packet。
run
run方法是sendthread乃至建立連接最核心的方法,內容也比較長,我們一節一節來看。
@Override
public void run() {
clientCnxnSocket.introduce(this,sessionId);
clientCnxnSocket.updateNow();
clientCnxnSocket.updateLastSendAndHeard();
int to;
long lastPingRwServer = Time.currentElapsedTime();
final int MAX_SEND_PING_INTERVAL = 10000; //10 seconds
很明顯,這一段最開始的代碼就是clientCnxnSocket相關的初始化工作。
while (state.isAlive()) {
try {
if (!clientCnxnSocket.isConnected()) {//未連接
if(!isFirstConnect){
try {
Thread.sleep(r.nextInt(1000));
} catch (InterruptedException e) {
LOG.warn("Unexpected exception", e);
}
}
// don't re-establish connection if we are closing
if (closing || !state.isAlive()) {//這裏最後有用到
break;
}
startConnect();//連接
clientCnxnSocket.updateLastSendAndHeard();
}
然後開始檢查和服務器的連接狀態,如果沒有連接就會調用startConnect()去連接server;如果已經連接上了那麽就會定期發送ping來維持心跳檢測。
if (state.isConnected()) {
// determine whether we need to send an AuthFailed event.
if (zooKeeperSaslClient != null) {
boolean sendAuthEvent = false;
if (zooKeeperSaslClient.getSaslState() == ZooKeeperSaslClient.SaslState.INITIAL) {//sasl狀態
try {
zooKeeperSaslClient.initialize(ClientCnxn.this);//sasl初始化,後面再研究
} catch (SaslException e) {
LOG.error("SASL authentication with Zookeeper Quorum member failed: " + e);
state = States.AUTH_FAILED;
sendAuthEvent = true;
}
}
KeeperState authState = zooKeeperSaslClient.getKeeperState();//連接狀態
if (authState != null) {
if (authState == KeeperState.AuthFailed) {//驗證失敗
// An authentication error occurred during authentication with the Zookeeper Server.
state = States.AUTH_FAILED;
sendAuthEvent = true;
} else {
if (authState == KeeperState.SaslAuthenticated) {
sendAuthEvent = true;
}
}
}
if (sendAuthEvent == true) {
eventThread.queueEvent(new WatchedEvent(
Watcher.Event.EventType.None,
authState,null));
}
}
//readTimeout = 2/3的sessiontimeout
to = readTimeout - clientCnxnSocket.getIdleRecv();//如果已經連接上,預計讀超時時間 - 距離上次讀已經過去的時間
} else {
to = connectTimeout - clientCnxnSocket.getIdleRecv();//如果沒連接上,預計連接時間 - 上次讀已經過去的時間,這兩次獲得的就是是否超過了預計的timeout時間
}
if (to <= 0) {//超時
String warnInfo;
warnInfo = "Client session timed out, have not heard from server in "
+ clientCnxnSocket.getIdleRecv()
+ "ms"
+ " for sessionid 0x"
+ Long.toHexString(sessionId);
LOG.warn(warnInfo);
throw new SessionTimeoutException(warnInfo);//打log,拋異常
}
if (state.isConnected()) {//如果是連接狀態
//1000(1 second) is to prevent race condition missing to send the second ping
//also make sure not to send too many pings when readTimeout is small
//計算到下次ping的時間,這裏做了優化,如果設置的時間過小會有個調節機制
int timeToNextPing = readTimeout / 2 - clientCnxnSocket.getIdleSend() -
((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0);
//send a ping request either time is due or no packet sent out within MAX_SEND_PING_INTERVAL
if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) {//如果已經過了一半的readtimeout時間或者超過十秒沒有ping
sendPing();//發送ping
clientCnxnSocket.updateLastSend();
} else {
if (timeToNextPing < to) {//如果預計下次ping的時間 < 實際距離下次ping的時間
to = timeToNextPing;
}
}
}
接下來是根據連接到的server的狀態來決策,如果是只讀的server,會自動去尋找讀寫的server。
// If we are in read-only mode, seek for read/write server
if (state == States.CONNECTEDREADONLY) {//連接是CONNECTEDREADONLY,那麽連接到的server就是read-only的
long now = Time.currentElapsedTime();
int idlePingRwServer = (int) (now - lastPingRwServer);//離上次ping讀寫server的時間
if (idlePingRwServer >= pingRwTimeout) {
lastPingRwServer = now;//更新連接讀寫server的時間
idlePingRwServer = 0;
pingRwTimeout =
Math.min(2*pingRwTimeout, maxPingRwTimeout);
pingRwServer();//嘗試去連接讀寫server
}
to = Math.min(to, pingRwTimeout - idlePingRwServer);
}
然後會把發送outgoingqueue中的請求數據並讀取server的回復。
clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this);
最後是一些清理工作和對連接斷開的處理。這裏已經跳出了上面的循環,有幾個地方需要註意:
cleanup();//連接和幾個queue的清理
clientCnxnSocket.close();//關閉連接
if (state.isAlive()) {//1??
eventThread.queueEvent(new WatchedEvent(Event.EventType.None,
Event.KeeperState.Disconnected, null));//加入一個斷開連接的event
}
ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(),
"SendThread exited loop for session: 0x"
+ Long.toHexString(getSessionId()));//打log
1??上面循環的條件就是while(state.isAlive()),之所以跳出了循環這裏還要判斷state狀態的原因是
// don't re-establish connection if we are closing
if (closing || !state.isAlive()) {
break;
}
在run方法最開始的代碼中會去判斷closing的狀態,closing是在client主動發送斷開連接的消息後closing才會設置成為false,而run方法中的while循環跳出來且state是alive的狀態只有可能是client端主動發送了斷開連接的消息,這時就給eventthread增加一個斷開連接的事件去處理。
總結,run方法主要做了下面幾個工作:
- clientCnxnSocket及相關參數的初始化;
- 如果client端沒有連接上server就會去嘗試連接;
- 無論是否連接上會去檢測連接是否超時;
- 如果已經連接上了那麽會定期去發送心跳檢測和server的連接狀態;
- 如果連接到了readonly的server,會主動去連接讀寫的server;
- 發送outgoingqueue裏的請求並接受server的回復;
- 包含連接斷開的相關清理工作。
到這裏創建階段基本就結束了,感覺這個過程主要的流程和一些處理大致明白了,但是過程中有非常多的細節,這可能需要以後如果有用到的地方再仔細看看。
思考
primeConnection關於request和response的順序問題
如前面所說,怎麽保證順序
RW和readonly模式
server的這兩種模式各自條件是?
session的工作機制
sessId, sessionId
其實sessId就是sessionId,seenRwServerBefore在第一次連接時會被設置為true,sessId在未連接時為0,第一次建立連接時構建的ConnectRequest中會設置sessionId為0。
不在pendingqueue裏的請求
auth和ping以及正在處理的sasl沒有加入pendingQueue,觸發的watch也沒有在pendingQueue中。根據上一篇的參考第一篇中notification event的介紹可以知道觸發的watch是server的主動通知,不會存在pendingqueue中。針對auth和ping的處理,在第八篇裏當時對replyHdr的xid不是很清楚,當時思考裏也提了這個問題,現在可以知道是針對auth和ping的。
sendping和pingRwServer
前者是心跳驗證,後者是連接到readonly的server後嘗試連接讀寫server。
大致連接過程
首先與ZooKeeper服務器建立連接,有兩層連接要建立。
客戶端與服務器端的TCP連接
- 在TCP連接的基礎上建立session關聯 建立TCP連接之後,客戶端發送ConnectRequest請求,申請建立session關聯,此時服務器端會為該客戶端分配sessionId和密碼,同時開啟對該session是否超時的檢測。
- 當在sessionTimeout時間內,即還未超時,此時TCP連接斷開,服務器端仍然認為該sessionId處於存活狀態。 此時,客戶端會選擇下一個ZooKeeper服務器地址進行TCP連接建立,TCP連接建立完成後,拿著之前的sessionId和密碼發送ConnectRequest請求,如果還未到該sessionId的超時時間,則表示自動重連成功。 對客戶端用戶是透明的,一切都在背後默默執行,ZooKeeper對象是有效的。
如果重新建立TCP連接後,已經達到該sessionId的超時時間了(服務器端就會清理與該sessionId相關的數據),則返回給客戶端的sessionTimeout時間為0,sessionid為0,密碼為空字節數組。 客戶端接收到該數據後,會判斷協商後的sessionTimeout時間是否小於等於0,如果小於等於0,則使用eventThread線程先發出一個KeeperState.Expired事件,通知相應的Watcher。 然後結束EventThread線程的循環,開始走向結束。此時ZooKeeper對象就是無效的了,必須要重新new一個新的ZooKeeper對象,分配新的sessionId了。
參考
https://www.jianshu.com/p/f69e6de5f169
http://www.cnblogs.com/leesf456/p/6098255.html
https://my.oschina.net/pingpangkuangmo/blog/486780
https://blog.csdn.net/cnh294141800/article/details/53039482
https://www.cnblogs.com/shangxiaofei/p/7171882.html
《從Paxos到Zookeeper》
Zookeeper源碼閱讀(九) ZK Client-Server(2)