Zookeeper 通知更新可靠嗎? 解讀源碼找答案!
本文由特魯門發表於雲+社區專欄
導讀:
遇到Keepper通知更新無法收到的問題,思考節點變更通知的可靠性,通過閱讀源碼解析了解到zk Watch的註冊以及觸發的機制,本地調試運行模擬zk更新的不可靠的場景以及得出相應的解決方案。
過程很曲折,但問題的根本原因也水落石出了,本文最後陳述了更新無法收到的根本原因,希望對其他人有所幫助。-----------------------------------------
通常Zookeeper是作為配置存儲、分布式鎖等功能被使用,配置讀取如果每一次都是去Zookeeper server讀取效率是非常低的,幸好Zookeeper提供節點更新的通知機制,只需要對節點設置Watch監聽,節點的任何更新都會以通知的方式發送到Client端。
如上圖所示:應用Client通常會連接上某個ZkServer,forPath不僅僅會讀取Zk 節點zkNode的數據(通常存儲讀取到的數據會存儲在應用內存中,例如圖中Value),而且會設置一個Watch,當zkNode節點有任何更新時,ZkServer會發送notify,Client運行Watch來才走出相應的事件相應。這裏假設操作為更新Client本地的數據。這樣的模型使得配置異步更新到Client中,而無需Client每次都遠程讀取,大大提高了讀的性能,(圖中的re-regist重新註冊是因為對節點的監聽是一次性的,每一次通知完後,需要重新註冊)。但這個Notify是可靠的嗎?如果通知失敗,那豈不是Client永遠都讀取的本地的未更新的值?
由於現網環境定位此類問題比較困難,因此本地下載源碼並模擬運行ZkServer & ZkClient來看通知的發送情況。
1、git 下載源碼 https://github.com/apache/zookeeper
2、cd 到路徑下,運行ant eclipse 加載工程的依賴。
3、導入Idea中。
<https://stackoverflow.com/questions/43964547/how-to-import-zookeeper-source-code-to-idea>
查看相關問題和步驟。
首先運行ZkServer。QuorumPeerMain是Server的啟動類。這個可以根據bin下ZkServer.sh找到入口。註意啟動參數配置參數文件,指定例如啟動端口等相關參數。
在此之前,需要設置相關的斷點。
首先我們要看client設置監聽後,server是如何處理的
ZkClient 是使用Nio的方式與ZkServer進行通信的,Zookeeper的線程模型中使用兩個線程:
SendThread專門成立的請求的發送,請求會被封裝為Packet(包含節點名稱、Watch描述等信息)類發送給Sever。
EventThread則專門處理SendThread接收後解析出的Event。
ZkClient 的主要有兩個Processor,一個是SycProcessor負責Cluster之間的數據同步(包括集群leader選取)。另一個是叫FinalRuestProcessor,專門處理對接受到的請求(Packet)進行處理。
//ZookeeperServer 的processPacket方法專門對收到的請求進行處理。
public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException {
// We have the request, now process and setup for next
InputStream bais = new ByteBufferInputStream(incomingBuffer);
BinaryInputArchive bia = BinaryInputArchive.getArchive(bais);
RequestHeader h = new RequestHeader();
h.deserialize(bia, "header");
// Through the magic of byte buffers, txn will not be
// pointing
// to the start of the txn
incomingBuffer = incomingBuffer.slice();
//鑒權請求處理
if (h.getType() == OpCode.auth) {
LOG.info("got auth packet " + cnxn.getRemoteSocketAddress());
AuthPacket authPacket = new AuthPacket();
ByteBufferInputStream.byteBuffer2Record(incomingBuffer, authPacket);
String scheme = authPacket.getScheme();
ServerAuthenticationProvider ap = ProviderRegistry.getServerProvider(scheme);
Code authReturn = KeeperException.Code.AUTHFAILED;
if(ap != null) {
try {
authReturn = ap.handleAuthentication(new ServerAuthenticationProvider.ServerObjs(this, cnxn), authPacket.getAuth());
} catch(RuntimeException e) {
LOG.warn("Caught runtime exception from AuthenticationProvider: " + scheme + " due to " + e);
authReturn = KeeperException.Code.AUTHFAILED;
}
}
if (authReturn == KeeperException.Code.OK) {
if (LOG.isDebugEnabled()) {
LOG.debug("Authentication succeeded for scheme: " + scheme);
}
LOG.info("auth success " + cnxn.getRemoteSocketAddress());
ReplyHeader rh = new ReplyHeader(h.getXid(), 0,
KeeperException.Code.OK.intValue());
cnxn.sendResponse(rh, null, null);
} else {
if (ap == null) {
LOG.warn("No authentication provider for scheme: "
+ scheme + " has "
+ ProviderRegistry.listProviders());
} else {
LOG.warn("Authentication failed for scheme: " + scheme);
}
// send a response...
ReplyHeader rh = new ReplyHeader(h.getXid(), 0,
KeeperException.Code.AUTHFAILED.intValue());
cnxn.sendResponse(rh, null, null);
// ... and close connection
cnxn.sendBuffer(ServerCnxnFactory.closeConn);
cnxn.disableRecv();
}
return;
} else {
if (h.getType() == OpCode.sasl) {
Record rsp = processSasl(incomingBuffer,cnxn);
ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.OK.intValue());
cnxn.sendResponse(rh,rsp, "response"); // not sure about 3rd arg..what is it?
return;
}
else {
Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(),
h.getType(), incomingBuffer, cnxn.getAuthInfo());
si.setOwner(ServerCnxn.me);
// Always treat packet from the client as a possible
// local request.
setLocalSessionFlag(si);
//交給finalRequestProcessor處理
submitRequest(si);
}
}
cnxn.incrOutstandingRequests(h);
}
FinalRequestProcessor 對請求進行解析,Client連接成功後,發送的exist命令會落在這部分處理邏輯。
zkDataBase 由zkServer從disk持久化的數據建立而來,上圖可以看到這裏就是添加監聽Watch的地方。
然後我們需要了解到,當Server收到節點更新事件後,是如何觸發Watch的。
首先了解兩個概念,FinalRequestProcessor處理的請求分為兩種,一種是事務型的,一種非事務型,exist 的event-type是一個非事物型的操作,上面代碼中是對其處理邏輯,對於事物的操作,例如SetData的操作。則在下面代碼中處理。
private ProcessTxnResult processTxn(Request request, TxnHeader hdr,
Record txn) {
ProcessTxnResult rc;
int opCode = request != null ? request.type : hdr.getType();
long sessionId = request != null ? request.sessionId : hdr.getClientId();
if (hdr != null) {
//hdr 為事物頭描述,例如SetData的操作就會被ZkDataBase接管操作,
//因為是對Zk的數據存儲機型修改
rc = getZKDatabase().processTxn(hdr, txn);
} else {
rc = new ProcessTxnResult();
}
if (opCode == OpCode.createSession) {
if (hdr != null && txn instanceof CreateSessionTxn) {
CreateSessionTxn cst = (CreateSessionTxn) txn;
sessionTracker.addGlobalSession(sessionId, cst.getTimeOut());
} else if (request != null && request.isLocalSession()) {
request.request.rewind();
int timeout = request.request.getInt();
request.request.rewind();
sessionTracker.addSession(request.sessionId, timeout);
} else {
LOG.warn("*****>>>>> Got "
+ txn.getClass() + " "
+ txn.toString());
}
} else if (opCode == OpCode.closeSession) {
sessionTracker.removeSession(sessionId);
}
return rc;
}
這裏設置了斷點,就可以攔截對節點的更新操作。
這兩個設置了斷點,就可以了解到Watch的設置過程。
接下來看如何啟動Zookeeper的Client。ZookeeperMain為Client的入口,同樣在bin/zkCli.sh中可以找到。註意設置參數,設置Server的連接地址。
修改ZookeeperMain方法,設置對節點的Watch監聽。
public ZooKeeperMain(String args[]) throws IOException, InterruptedException, KeeperException {
cl.parseOptions(args);
System.out.println("Connecting to " + cl.getOption("server"));
connectToZK(cl.getOption("server"));
while (true) {
// 模擬註冊對/zookeeper節點的watch監聽
zk.exists("/zookeeper", true);
System.out.println("wait");
}
}
啟動Client。
由於我們要觀察節點變更的過程,上面這個Client設置了對節點的監聽,那麽我們需要另外一個cleint對節點進行更改,這個我們只需要在命令上進行就可以了。
此時命令行的zkClient更新了/zookeeper節點,Server此時會停在setData事件的處理代碼段。
public Stat setData(String path, byte data[], int version, long zxid,
long time) throws KeeperException.NoNodeException {
Stat s = new Stat();
DataNode n = nodes.get(path);
if (n == null) {
throw new KeeperException.NoNodeException();
}
byte lastdata[] = null;
synchronized (n) {
lastdata = n.data;
n.data = data;
n.stat.setMtime(time);
n.stat.setMzxid(zxid);
n.stat.setVersion(version);
n.copyStat(s);
}
// now update if the path is in a quota subtree.
String lastPrefix = getMaxPrefixWithQuota(path);
if(lastPrefix != null) {
this.updateBytes(lastPrefix, (data == null ? 0 : data.length)
- (lastdata == null ? 0 : lastdata.length));
}
//觸發watch監聽
dataWatches.triggerWatch(path, EventType.NodeDataChanged);
return s;
}
此時,我們重點關註的類出現了。WatchManager
package org.apache.zookeeper.server;
import java.io.PrintWriter;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This class manages watches. It allows watches to be associated with a string
* and removes watchers and their watches in addition to managing triggers.
*/
class WatchManager {
private static final Logger LOG = LoggerFactory.getLogger(WatchManager.class);
//存儲path對watch的關系
private final Map<String, Set<Watcher>> watchTable =
new HashMap<String, Set<Watcher>>();
//存儲watch監聽了哪些path節點
private final Map<Watcher, Set<String>> watch2Paths =
new HashMap<Watcher, Set<String>>();
synchronized int size(){
int result = 0;
for(Set<Watcher> watches : watchTable.values()) {
result += watches.size();
}
return result;
}
//添加監聽
synchronized void addWatch(String path, Watcher watcher) {
Set<Watcher> list = watchTable.get(path);
if (list == null) {
// don‘t waste memory if there are few watches on a node
// rehash when the 4th entry is added, doubling size thereafter
// seems like a good compromise
list = new HashSet<Watcher>(4);
watchTable.put(path, list);
}
list.add(watcher);
Set<String> paths = watch2Paths.get(watcher);
if (paths == null) {
// cnxns typically have many watches, so use default cap here
paths = new HashSet<String>();
watch2Paths.put(watcher, paths);
}
paths.add(path);
}
//移除
synchronized void removeWatcher(Watcher watcher) {
Set<String> paths = watch2Paths.remove(watcher);
if (paths == null) {
return;
}
for (String p : paths) {
Set<Watcher> list = watchTable.get(p);
if (list != null) {
list.remove(watcher);
if (list.size() == 0) {
watchTable.remove(p);
}
}
}
}
Set<Watcher> triggerWatch(String path, EventType type) {
return triggerWatch(path, type, null);
}
//觸發watch
Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
WatchedEvent e = new WatchedEvent(type,
KeeperState.SyncConnected, path);
Set<Watcher> watchers;
synchronized (this) {
watchers = watchTable.remove(path);
if (watchers == null || watchers.isEmpty()) {
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(LOG,
ZooTrace.EVENT_DELIVERY_TRACE_MASK,
"No watchers for " + path);
}
return null;
}
for (Watcher w : watchers) {
Set<String> paths = watch2Paths.get(w);
if (paths != null) {
paths.remove(path);
}
}
}
for (Watcher w : watchers) {
if (supress != null && supress.contains(w)) {
continue;
}
//通知發送
w.process(e);
}
return watchers;
}
}
重點關註triggerWatch的方法,可以發現watch被移除後,即往watch中存儲的client信息進行通知發送。
@Override
public void process(WatchedEvent event) {
ReplyHeader h = new ReplyHeader(-1, -1L, 0);
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK,
"Deliver event " + event + " to 0x"
+ Long.toHexString(this.sessionId)
+ " through " + this);
}
// Convert WatchedEvent to a type that can be sent over the wire
WatcherEvent e = event.getWrapper();
sendResponse(h, e, "notification");
}
沒有任何確認機制,不會由於發送失敗,而回寫watch。
結論:
到這裏,可以知道watch的通知機制是不可靠的,zkServer不會保證通知的可靠抵達。雖然zkclient與zkServer端是會有心跳機制保持鏈接,但是如果通知過程中斷開,即時重新建立連接後,watch的狀態是不會恢復。
現在已經知道了通知是不可靠的,會有丟失的情況,那ZkClient的使用需要進行修正。
本地的存儲不再是一個靜態的等待watch更新的狀態,而是引入緩存機制,定期的去從Zk主動拉取並註冊Watch(ZkServer會進行去重,對同一個Node節點的相同時間類型的Watch不會重復)。
另外一種方式是,Client端收到斷開連接的通知,重新註冊所有關註節點的Watch。但作者遇到的現網情況是client沒有收到更新通知的同時,也沒有查看到連接斷開的錯誤信息。這塊仍需進一步確認。水平有限,歡迎指正 :D
在StackOverFlow上的提問有了新進展:
<https://stackoverflow.com/questions/49328151/is-zookeeper-node-change-notification-reliable-under-situations-of-connection-lo>
原來官方文檔已經解釋了在連接斷開的時候,client對watch的一些恢復操做,ps:原來上面我提到的客戶端的策略已經官方實現。。。
客戶端會通過心跳保活,如果發現斷開了連接,會重新建立連接,並發送之前對節點設置的watch以及節點zxid,如果zxid與服務端的小則說明斷開期間有更改,那麽server會觸發通知。
這麽來看,Zookeeper的通知機制至少在官方的文檔說明上是可靠的,至少是有相應機制去保證。ps:除Exist watch外。但是本人遇到的問題仍未解開。。後悔當初沒有保留現場,深入發掘。計劃先把實現改回原來的,後續進一步驗證。找到原因再更新這裏。
最終結論更新!
通過深入閱讀apache的zk論壇以及源碼,有一個重要的信息。
上面提到的連接斷開分為recoverble以及unrecoverble兩種場景,這兩種的區別主要是基於Session的有效期,所有的client操作包括watch都是和Session關聯的,當Session在超時過期時間內,重新成功建立連接,則watch會在連接建立後重新設置。但是當Session Timeout後仍然沒有成功重新建立連接,那麽Session則處於Expire的狀態。下面連接講述了這個過程
How should I handle SESSION_EXPIRED?
這種情況下,ZookeeperClient會重新連接,但是Session將會是全新的一個。同時之前的狀態是不會保存的。
private void conLossPacket(Packet p) {
if (p.replyHeader == null) {
return;
}
switch (state) {
case AUTH_FAILED:
p.replyHeader.setErr(KeeperException.Code.AUTHFAILED.intValue());
break;
case CLOSED:
// session關閉狀態,直接返回。
p.replyHeader.setErr(KeeperException.Code.SESSIONEXPIRED.intValue());
break;
default:
p.replyHeader.setErr(KeeperException.Code.CONNECTIONLOSS.intValue());
}
// 如果session未過期,這裏進行session的狀態(watches)會重新註冊。
finishPacket(p);
}
*1、什麽是zookeeper的會話過期?*
一般來說,我們使用zookeeper是集群形式,如下圖,client和zookeeper集群(3個實例)建立一個會話session。
在這個會話session當中,client其實是隨機與其中一個zk provider建立的鏈接,並且互發心跳heartbeat。zk集群負責管理這個session,並且在所有的provider上維護這個session的信息,包括這個session中定義的臨時數據和監視點watcher。
如果再網絡不佳或者zk集群中某一臺provider掛掉的情況下,有可能出現connection loss的情況,例如client和zk provider1連接斷開,這時候client不需要任何的操作(zookeeper api已經給我們做好了),只需要等待client與其他provider重新連接即可。這個過程可能導致兩個結果:
1)在session timeout之內連接成功
這個時候client成功切換到連接另一個provider例如是provider2,由於zk在所有的provider上同步了session相關的數據,此時可以認為無縫遷移了。
2)在session timeout之內沒有重新連接
這就是session expire的情況,這時候zookeeper集群會任務會話已經結束,並清除和這個session有關的所有數據,包括臨時節點和註冊的監視點Watcher。
在session超時之後,如果client重新連接上了zookeeper集群,很不幸,zookeeper會發出session expired異常,且不會重建session,也就是不會重建臨時數據和watcher。
我們實現的ZookeeperProcessor是基於Apache Curator的Client封裝實現的。
Apache Curator 錯誤處理機制
它對於Session Expire的處理是提供了處理的監聽註冊ConnectionStateListner,當遇到Session Expire時,執行使用者要做的邏輯。(例如:重新設置Watch)遺憾的是,我們沒有對這個事件進行處理,因此連接是一致斷開的,但是!我們應用仍然會讀到老的數據!
在這裏,我們又犯了另外一個錯誤,本地緩存了zookeeper的節點數據。。其實zookeeperClient已經做了本地緩存的機制,但是我們有加了一層(註:這裏也有一個原因,是因為zk節點的數據時二進制的數組,業務要使用通常要反序列化,我們這裏的緩存是為了減少反序列化帶來的開銷!),正式由於我們本地緩存了,因此即使zk斷開了,仍然讀取了老的值!
至此,謎團已經全部解開,看來之前的實現有許多姿勢是錯誤的,導致後續出現了各種奇怪的BUG 。現在處理的方案,是監聽Reconnect的通知,當收到這個通知後,主動讓本地緩存失效(這裏仍然做了緩存,是因為減少反序列化的開銷,zkClient的緩存只是緩存了二進制,每次拿出來仍然需要反序列化)。代碼:
curatorFramework.getConnectionStateListenable().addListener(new ConnectionStateListener() {
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState) {
switch (newState) {
case CONNECTED:
break;
case RECONNECTED:
LOG.error("zookeeper connection reconnected");
System.out.println("zookeeper connection reconnected");
//本來使用invalidateAll,但是這個會使得cache所有緩存值同時失效
//如果關註節點比較多,導致同時請求zk讀值,可能服務會瞬時阻塞在這一步
//因此使用guava cache refresh方法,異步更新,更新過程中,
//老值返回,知道更新完成
for (String key : classInfoMap.keySet()) {
zkDataCache.refresh(key);
}
break;
case LOST:
// session 超時,斷開連接,這裏不要做任何操作,緩存保持使用
LOG.error("zookeeper connection lost");
System.out.println("zookeeper connection lost");
break;
case SUSPENDED:
break;
default:
break;
}
}
});
問答
如何閱讀Zookeeper事務日誌?
相關閱讀
Zookeeper總覽
ZooKeeper入門
zookeeper原理
【每日課程推薦】機器學習實戰!快速入門在線廣告業務及CTR相應知識
此文已由作者授權騰訊雲+社區發布,更多原文請點擊
搜索關註公眾號「雲加社區」,第一時間獲取技術幹貨,關註後回復1024 送你一份技術課程大禮包!
海量技術實踐經驗,盡在雲加社區!
Zookeeper 通知更新可靠嗎? 解讀源碼找答案!