運維平臺-cmdb配置庫
Zookeeper學習筆記
參考資料:
一、概述
1.1 Zookeeper偽叢集的搭建
這裡以個人電腦windwos平臺為例
首先下載zookeeper的安裝包我這裡是3.6.3版本,下載地址
解壓後,建立data和log目錄,如下圖:
然後在data和log目錄中建三個資料夾:
在data目錄的每一個資料夾中建立一個myid的檔案,然後檔案內容為id:
然後複製zoo_sample.cfg檔案,複製三份,並修改配置檔案內容,:
tickTime=2000 initLimit=10 syncLimit=5 dataDir=D:\zookeeper\apache-zookeeper-3.6.3-bin\data\zoo-1 dataLogDir=D:\zookeeper\apache-zookeeper-3.6.3-bin\log\zoo-1 # the port at which the clients will connect clientPort=2181 #在3.6.x以上版本zookeeper有一個管理後臺,預設佔用8080埠,所以需要手動修改 admin.serverPort=2666 server.1=127.0.0.1:2888:3888
進入bin目錄,複製zkServer.cmd,複製三分,並進行修改:
@echo off setlocal call "%~dp0zkEnv.cmd" set ZOOCFG=D:\zookeeper\apache-zookeeper-3.6.3-bin\conf\zoo-1.cfg set ZOOMAIN=org.apache.zookeeper.server.quorum.QuorumPeerMain set ZOO_LOG_FILE=zookeeper-%USERNAME%-server-%COMPUTERNAME%.log echo on call %JAVA% "-Dzookeeper.admin.enableServer=false -Dzookeeper.log.dir=%ZOO_LOG_DIR%" "-Dzookeeper.root.logger=%ZOO_LOG4J_PROP%" "-Dzookeeper.log.file=%ZOO_LOG_FILE%" "-XX:+HeapDumpOnOutOfMemoryError" "-XX:OnOutOfMemoryError=cmd /c taskkill /pid %%%%p /t /f" -cp "%CLASSPATH%" %ZOOMAIN% "%ZOOCFG%" %* endlocal
然後分別啟動三個cmd即可。到此zookeeper的偽叢集搭建完成。
1.2 Zookeeper儲存模型
zookeeper的儲存模式很簡單,與Linux的檔案系統類似。zookeeper是一顆以/
為根節點的樹,而其中的每一個節點叫ZNode,所有的節點通過樹結構按層次組織在一起,構成一個ZNode樹。每個節點都用一個完整的路徑唯一標識,比如/test/node1
。
通過ZNode樹,ZooKeeper提供一個多層級的樹狀名稱空間。該樹狀名稱空間與檔案的目錄系統中的目錄樹有所不同的是:這些ZNode節點可以儲存二進位制負載資料(Payload)。而檔案系統目錄樹中的目錄,只能存放路徑資訊,二不能存放負載資料。
一個節點的負載資料能放多少二進位制資料呢?ZooKeeper為了保證高吞吐和低延遲,整個樹狀的目錄結構全部都放在記憶體中。與硬碟和其他的外存裝置相比,機器的記憶體比較有限,使得ZooKeeper的目錄結構,不能用於存放大量的資料。ZooKeeper官方的要求是,每個節點存放的Payload負載資料的上限,僅僅為1M。
二、客戶端的使用
zookeeper的應用開發主要通過Java客戶端去連線和操作zookeeper叢集,目前的Java客戶端有:
- zookeeper官方的Java客戶端
- 第三方的客戶端API
ZooKeeper官方的客戶端API提供了基本的操作。比如,建立會話、建立節點、讀取節點、更新資料、刪除節點和檢查節點是否存在等。但對於實際開發來說,ZooKeeper官方API有一些不足之處:
- ZooKeeper的Watcher監測是一次性的,每次觸發之後都需要重新進行註冊;
- Session超時之後沒有實現重連機制;
- 異常處理繁瑣,ZooKeeper提供了很多異常,對於開發人員來說可能根本不知道該如何處理這些異常資訊;
- 只提供了簡單的byte[] 陣列型別的介面,沒有提供Java POJO級別的序列化資料處理介面;
- 建立節點時如果節點存在丟擲異常,需要自行檢查節點是否存在;
- 無法實現級聯刪除
因此我這裡採用第三方客戶端進行學習,第三方開源客戶端API主要有zkClient和Curator。我這裡使用Curator進行學習。
Curator是Netflix公司開源的一套ZooKeeper客戶端框架,和ZkClient一樣,Curator提供了非常底層的細節開發工作,包括Session會話超時重連、掉線重連、反覆註冊Watcher和NodeExistsException異常等。它是Apache基金會的頂級專案之一,Curator具有更加完善的文件,另外還提供了一套易用性和可讀性更強的Fluent風格的客戶端API框架。Curator還提供了ZooKeeper一些比較普遍的分散式開發的開箱即用的解決方案,比如Recipes、共享鎖服務、Master選舉機制和分散式計算器等,Java應用開發時在這些小的元件上可以不用重複造輪子。
2.1 建立Client
使用Curator首先需要引入依賴,我的zookeeper的版本是3.6.3,這裡引入以下版本Curator:
<!--curator版本需要與zookeeper版本匹配,這裡-->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
<version>5.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>5.2.0</version>
</dependency>
然後我們就可以建立Curator客戶端了,Curator客戶端有兩種方式可以進行建立,一種是使用newClient方式,另一種是builder構造者方法。
這裡將兩種建立方式封裝成一個工廠類,程式碼如下:
public class ClientFactory {
/**
* 建立簡單的CuratorFramework例項
*
* @param connectionStr 連線地址
* @author Yuhaoran
* @date 2022/11/18 10:38
*/
public static CuratorFramework createSimple(String connectionStr) {
//重試策略,第一次重試等待1s,第二次重試等待2s,第三次4s。。。
//baseSleepTimeMs:等待時間的基礎單位,下面設定了1000ms
//maxRetries:重試次數
ExponentialBackoffRetry retry = new ExponentialBackoffRetry(1000, 3);
//使用工廠類的靜態方法
//引數1:zk的連線地址;引數2:重試策略
return CuratorFrameworkFactory.newClient(connectionStr, retry);
}
/**
* 建立帶引數的CuratorFramework例項
*
* @param connStr 連線地址
* @param retryPolicy 重試策略
* @param connTimeoutMs 連線超時時長
* @param sessionTimeoutMs 會話超時時長
* @author Yuhaoran
* @date 2022/11/18 10:39
*/
public static CuratorFramework createWithOptions(String connStr, RetryPolicy retryPolicy, int connTimeoutMs, int sessionTimeoutMs) {
//通過builder構造者方法構造
return CuratorFrameworkFactory.builder()
.connectString(connStr)
.retryPolicy(retryPolicy)
.connectionTimeoutMs(connTimeoutMs)
.sessionTimeoutMs(sessionTimeoutMs)
.build();
}
}
2.2 節點的增刪改查
關於Znode節點的增刪改查,示例程式碼和註釋如下:
@Slf4j
public class TestCurator {
private static final String ZK_ADDRESS="127.0.0.1:2881";
@Test
public void createNode(){
CuratorFramework client = ClientFactory.createSimple(ZK_ADDRESS);
try {
//啟動客戶端,連線伺服器
client.start();
//建立一個節點,節點資料為 payload
String data = "Hello,ZK!";
byte[] bytes = data.getBytes(StandardCharsets.UTF_8);
String zkPath = "/test/CURD/node-1";
//create方法可以建立Znode節點,但該方法不會立即建立節點,而是返回一個CreateBuilder構造者例項,並設定一些引數
//最終通過forPath方法完成真正的節點建立工作
client.create()
.creatingParentsIfNeeded()
//節點型別主要有四種
//PERSISTENT 持久化節點:持久節點,是指在節點建立後,就一直存在,直到有刪除操作來主動清除這個節點。持久節點的生命週期是永久有效,不會因為建立該節點的客戶端會話失效而消失
//PERSISTENT_SEQUENTIAL 持久化順序節點:持久化順序節點的每個父節點會為他的第一級子節點維護一份次序,會記錄每個子節點建立的先後順序
//EPHEMERAL臨時節點:臨時節點的生命週期和客戶端會話繫結
//EPHEMERAL_SEQUENTIAL臨時順序節點:帶有順序編號的臨時節點
.withMode(CreateMode.PERSISTENT)
.forPath(zkPath,bytes);
}catch (Exception e){
log.error(e.getMessage());
}finally {
client.close();
}
}
@Test
public void readNode(){
CuratorFramework client = ClientFactory.createSimple(ZK_ADDRESS);
try {
client.start();
String zkPath = "/test/CURD/node-1";
//無論是checkExists、getData、getChildren方法,都有一個共同的特點,就是方法法返回的是構造者例項,不會立即執行,最終需要呼叫forPath進行呼叫
Stat stat = client.checkExists().forPath(zkPath);
if (stat!=null){
byte[] bytes = client.getData().forPath(zkPath);
String data = new String(bytes);
log.info("data={}",data);
String parentPath = "/test";
List<String> list = client.getChildren().forPath(parentPath);
for (String child : list) {
log.info("child={}",child);
}
}
}catch (Exception e){
log.error(e.getMessage());
}finally {
client.close();
}
}
//節點的更新操作,可以分為同步更新與非同步更新,以下是同步更新
@Test
public void updateNode(){
CuratorFramework client = ClientFactory.createSimple(ZK_ADDRESS);
try {
client.start();
String zkPath = "/test/CURD/node-1";
String data = "Hello,Update_ZK";
byte[] bytes = data.getBytes(StandardCharsets.UTF_8);
//使用setData()方法可以進行同步更新
client.setData().forPath(zkPath,bytes);
}catch (Exception e){
log.error(e.getMessage());
}finally {
client.close();
}
}
//節點的更新操作,可以分為同步更新與非同步更新,以下是非同步更新
@Test
public void updateNodeAsync(){
CuratorFramework client = ClientFactory.createSimple(ZK_ADDRESS);
try {
AsyncCallback callback = new AsyncCallback.StringCallback() {
@Override
public void processResult(int i, String s, Object o, String s1) {
log.info("i={};\ns={};\no={};\ns1={}",i,s,o,s1);
}
};
client.start();
String zkPath = "/test/CURD/node-1";
String data = "Hello,Update_Async_ZK";
byte[] bytes = data.getBytes(StandardCharsets.UTF_8);
//使用setData()方法可以進行同步更新
client.setData()
.inBackground(callback)//設定回撥例項
.forPath(zkPath,bytes);
}catch (Exception e){
log.error(e.getMessage());
}finally {
client.close();
}
}
@Test
public void deleteNode(){
CuratorFramework client = ClientFactory.createSimple(ZK_ADDRESS);
try {
client.start();
String zkPath = "/test/CURD/node-1";
//刪除也可以非同步,用法與更新類似
client.delete().forPath(zkPath);
String parentPath = "/test";
List<String> list = client.getChildren().forPath(parentPath);
for (String child : list) {
log.info("child={}",child);
}
}catch (Exception e){
log.error(e.getMessage());
}finally {
client.close();
}
}
}
2.3 客戶端CURD示例
分散式節點ID生成示例,此示例的原理是zookeeper的順序節點可以進行自動編號,因此通過臨時順序節點即可實現ID生成,程式碼如下:
public class IDMaker {
private static final String ZK_ADDRESS = "127.0.0.1:2181";
private String createID(String prefix) {
CuratorFramework client = ClientFactory.createSimple(ZK_ADDRESS);
try {
client.start();
String forPath = client
.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
.forPath(prefix);
return forPath;
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
public String getID(String nodeName) {
//返回建立的節點的路徑名稱
String result = createID(nodeName);
if (result == null) {
return null;
}
int index = result.lastIndexOf(nodeName);
if (index >= 0) {
index += nodeName.length();
return index < result.length() ? result.substring(index) : "";
}
return result;
}
}
@Test
public void testIDMaker(){
IDMaker idMaker = new IDMaker();
for (int i = 0; i < 10; i++) {
System.out.println("####"+idMaker.getID("/test/IDMake-"));
}
}
三、分散式事件監聽
針對ZooKeeper服務端的節點操作事件監聽,是客戶端操作伺服器的一項重點工作。在Curator的API中,事件監聽有兩種方式:一是標準的觀察者模式,二是快取監聽模式。標準的觀察者模式,是通過Watcher監聽器實現,快取監聽模式則是引入了本地快取Cache機制實現。簡單地理解,Cache在客戶端快取了Znode的各種狀態,當感知到Zookeeper叢集的Znode變化,會觸發event事件,註冊到這些事件上的監聽器會處理這些事件。雖然Cache是快取機制,但是可以藉助Cache實現事件的監聽,並且Watcher只能監聽一次,Cache可以多次監聽。
3.1 Watcher
下面是Watcher的用法:
@Slf4j
public class TestZkWatcher {
private static final String ZK_ADDRESS="127.0.0.1:2181";
private final String workerPath = "/test/listener/remoteNode";
private final String subWorkerPath = "/test/listener/remoteNode/id-";
@Test
public void testWatcher(){
CuratorFramework client = ClientFactory.createSimple(ZK_ADDRESS);
client.start();
try {
Stat stat = client.checkExists().forPath(workerPath);
if (stat==null){
client.create().creatingParentsIfNeeded().forPath(workerPath);
}
//構建Watcher例項
Watcher watcher = new Watcher() {
//process是回撥方法;Watcher只能監聽一次
@Override
public void process(WatchedEvent watchedEvent) {
//輸出結果為:
//####監聽的變化 watchEvent=WatchedEvent state:SyncConnected type:NodeDataChanged path:/test/listener/remoteNode
//具體含義可以查下表
System.out.println("####監聽的變化 watchEvent="+watchedEvent);
}
};
//一般來說可以通過GetDataBuilder、GetChildrenBuilder、ExistsBuidler等實現了Watchable介面的構造者例項,通過其usingWatcher方法使用Watcher監聽器
//比如下面getData()返回的就是一個GetDataBuilder例項
byte[] bytes = client.getData().usingWatcher(watcher).forPath(workerPath);
log.info("監聽節點內容={}",new String(bytes));
client.setData().forPath(workerPath,"第一次變動".getBytes(StandardCharsets.UTF_8));
client.setData().forPath(workerPath,"第二次變動".getBytes(StandardCharsets.UTF_8));
Thread.sleep(Integer.MAX_VALUE);
}catch (Exception e){
e.printStackTrace();
}
}
}
在WatchedEvent中一共有三個屬性:keeperState通知狀態、eventType事件型別、path節點路徑,三個屬性。關於通知狀態和事件型別如下圖:(圖片來自高併發核心程式設計卷一)
3.2 Cache
Curator引入的Cache快取實現,Cache快取擁有一個系列的型別,包括了Node Cache 、Path Cache、Tree Cache三組類:
- Node Cache節點快取可以用於ZNode節點的監聽;
- Path Cache子節點快取用於ZNode的子節點的監聽;
- Tree Cache樹快取是Path Cache的增強,不僅僅能監聽子節點,也能監聽ZNode節點自身。
下面是Node Cache的例子:
@Test
public void testNodeCache(){
CuratorFramework client = ClientFactory.createSimple(ZK_ADDRESS);
client.start();
try {
NodeCache nodeCache = new NodeCache(client,workerPath,false);
NodeCacheListener listener = new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
ChildData currentData = nodeCache.getCurrentData();
log.info("####節點變化,path={}",currentData.getPath());
log.info("####節點變化,data={}",new String(currentData.getData(),StandardCharsets.UTF_8));
log.info("####節點變化,stat={}",currentData.getStat());
}
};
nodeCache.getListenable().addListener(listener);
nodeCache.start();
client.setData().forPath(workerPath,"第111111次變動".getBytes(StandardCharsets.UTF_8));
Thread.sleep(1000);
client.setData().forPath(workerPath,"第222222次變動".getBytes(StandardCharsets.UTF_8));
Thread.sleep(1000);
client.setData().forPath(workerPath,"第333333次變動".getBytes(StandardCharsets.UTF_8));
Thread.sleep(Integer.MAX_VALUE);
}catch (Exception e){
e.printStackTrace();
}
}
下面是Path Cache的例子:
@Test
public void testPathCache() {
CuratorFramework client = ClientFactory.createSimple(ZK_ADDRESS);
client.start();
try {
PathChildrenCache cache =
new PathChildrenCache(client, workerPath, true);
PathChildrenCacheListener l =
new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework client,
PathChildrenCacheEvent event) {
try {
ChildData data = event.getData();
switch (event.getType()) {
case CHILD_ADDED:
log.info("####子節點增加, path={}, data={}",
data.getPath(), new String(data.getData(), "UTF-8"));
break;
case CHILD_UPDATED:
log.info("####子節點更新, path={}, data={}",
data.getPath(), new String(data.getData(), "UTF-8"));
break;
case CHILD_REMOVED:
log.info("####子節點刪除, path={}, data={}",
data.getPath(), new String(data.getData(), "UTF-8"));
break;
default:
break;
}
} catch (
UnsupportedEncodingException e) {
e.printStackTrace();
}
}
};
cache.getListenable().addListener(l);
cache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
for (int i = 0; i < 3; i++) {
client.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.forPath(subWorkerPath+i, ("第"+i+"次新增").getBytes(StandardCharsets.UTF_8));
}
Thread.sleep(1000);
for (int i = 0; i < 3; i++) {
Stat stat = client.checkExists().forPath(subWorkerPath + i);
if (stat != null){
client.delete().forPath(subWorkerPath+i);
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
下面是Tree Cache的例子:
@Test
public void testTreeCache() {
CuratorFramework client = ClientFactory.createSimple(ZK_ADDRESS);
client.start();
try {
Stat statP = client.checkExists().forPath(workerPath);
if (statP == null){
client.create().creatingParentsIfNeeded().forPath(workerPath);
}
TreeCache treeCache =
new TreeCache(client, workerPath);
TreeCacheListener l =
new TreeCacheListener() {
@Override
public void childEvent(CuratorFramework client,
TreeCacheEvent event) {
try {
ChildData data = event.getData();
if (data == null) {
log.info("資料為空");
return;
}
switch (event.getType()) {
case NODE_ADDED:
log.info("####節點增加, path={}, data={}",
data.getPath(), new String(data.getData(), "UTF-8"));
break;
case NODE_UPDATED:
log.info("####節點更新, path={}, data={}",
data.getPath(), new String(data.getData(), "UTF-8"));
break;
case NODE_REMOVED:
log.info("####節點刪除, path={}, data={}",
data.getPath(), new String(data.getData(), "UTF-8"));
break;
default:
break;
}
} catch (
UnsupportedEncodingException e) {
e.printStackTrace();
}
}
};
treeCache.getListenable().addListener(l);
treeCache.start();
Thread.sleep(1000);
for (int i = 0; i < 3; i++) {
client.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.forPath(subWorkerPath+i, ("第"+i+"次新增").getBytes(StandardCharsets.UTF_8));
}
Thread.sleep(1000);
for (int i = 0; i < 3; i++) {
Stat stat = client.checkExists().forPath(subWorkerPath + i);
if (stat != null){
client.delete().forPath(subWorkerPath+i);
}
}
Thread.sleep(1000);
client.delete().forPath(workerPath);
Thread.sleep(Integer.MAX_VALUE);
} catch (Exception e) {
e.printStackTrace();
}
}
對於Zookeeper3.6.x之前版本,Curator提供的Cache分為Path Cache、NodeCache、Tree Cache,三者隸屬於三個獨立的類。對於之後的版本,統一使用一個工具類CuratorCache實現。就是說,Zookeeper3.6.x之後版本,Path Cache、Node Cache、Tree Cache均標記為廢棄,建議使用Curator Cache。例子如下:
@Test
public void testCuratorCache() {
CuratorFramework client = ClientFactory.createSimple(ZK_ADDRESS);
client.start();
try {
/*預設已指定節點為根的整個子樹都被快取;如果使用SINGLE_NODE_CACHE則只會快取指定的節點
* CuratorCache.Options除了單節點還有兩個選項:
* COMPRESSED_DATA: 通過org.apache.curator.framework.api.GetDataBuilder.decompressed()解壓資料
* DO_NOT_CLEAR_ON_CLOSE:當通過close()關閉快取時,會通過CuratorCacheStorage.clear()清除storage,此選項可以防止storage被清除
* */
CuratorCache curatorCache = CuratorCache.build(client, workerPath, CuratorCache.Options.SINGLE_NODE_CACHE);
//通過lambda構建listener
CuratorCacheListener listener = CuratorCacheListener.builder()
//新增快取中的資料時呼叫
.forCreates(node -> System.out.println(String.format("####Node created: [%s]", node)))
//修改快取中的資料時呼叫
.forChanges((oldNode, node) -> System.out.println(String.format("####Node changed. Old: [%s] New: [%s]", oldNode, node)))
//刪除快取中的資料時呼叫
.forDeletes(oldNode -> System.out.println(String.format("####Node deleted. Old value: [%s]", oldNode)))
//初始化完成時呼叫
.forInitialized(() -> System.out.println("####Cache initialized"))
.build();
curatorCache.listenable().addListener(listener);
//或者匿名建立listener
// curatorCache.listenable().addListener(new CuratorCacheListener() {
// @Override
// public void event(Type type, ChildData oldData, ChildData data) {
// switch (type.name()) {
// case "NODE_CREATED":
// System.out.println("####NODE_CREATED: " + oldData + " : " + data);
// break;
// case "NODE_CHANGED":
// System.out.println("####NODE_CHANGED: " + oldData + " : " + data);
// break;
// case "NODE_DELETED":
// System.out.println("####NODE_DELETED: " + oldData + " : " + data);
// break;
// default:
// break;
// }
// }
// });
curatorCache.start();
client.setData().forPath(workerPath, "第111111次變動".getBytes(StandardCharsets.UTF_8));
Thread.sleep(1000);
client.setData().forPath(workerPath, "第222222次變動".getBytes(StandardCharsets.UTF_8));
Thread.sleep(1000);
client.setData().forPath(workerPath, "第333333次變動".getBytes(StandardCharsets.UTF_8));
Thread.sleep(Integer.MAX_VALUE);
} catch (Exception e) {
e.printStackTrace();
}
}
四、分散式鎖
4.1 自定義Zookeeper分散式鎖
我們先來了解什麼是公平鎖、非公平鎖,可重入鎖、不可重入鎖。以下內容來自我的部落格Java多執行緒:
公平鎖和非公平鎖
公平鎖是指多個執行緒按照申請鎖的順序來獲取鎖,執行緒直接進入佇列中排隊,佇列中的第一個執行緒才能獲得鎖。公平鎖的優點是等待鎖的執行緒不會餓死。缺點是整體吞吐效率相對非公平鎖要低,等待佇列中除第一個執行緒以外的所有執行緒都會阻塞,CPU喚醒阻塞執行緒的開銷比非公平鎖大。
非公平鎖是多個執行緒加鎖時直接嘗試獲取鎖,獲取不到才會到等待佇列的隊尾等待。但如果此時鎖剛好可用,那麼這個執行緒可以無需阻塞直接獲取到鎖,所以非公平鎖有可能出現後申請鎖的執行緒先獲取鎖的場景。非公平鎖的優點是可以減少喚起執行緒的開銷,整體的吞吐效率高,因為執行緒有機率不阻塞直接獲得鎖,CPU不必喚醒所有執行緒。缺點是處於等待佇列中的執行緒可能會餓死,或者等很久才會獲得鎖。
以水井為例,由管理員看守且管理員只有一把鎖,如果前面有人打水,那麼這個d想打水就必須排隊,且必須去隊尾排隊,管理員只會給隊伍中最前面的人鎖並讓你去打水。
如果是非公平鎖,即便隊伍中有等待的人,但如果剛好上一個人剛打完水交還鎖且管理員還沒有允許下一個人去打水時,這時來了一個插隊的人,這個插隊的人可以直接去拿鎖不需要排隊。
可重入和不可重入鎖
以水井為例,有多個人在排隊打水,此時管理員允許鎖和同一個人的多個水桶繫結。這個人用多個水桶打水時,第一個水桶和鎖繫結並打完水之後,第二個水桶也可以直接和鎖繫結並開始打水,所有的水桶都打完水之後打水人才會將鎖還給管理員。這個人的所有打水流程都能夠成功執行,後續等待的人也能夠打到水。這就是可重入鎖。
如果是非可重入鎖的話,此時管理員只允許鎖和同一個人的一個水桶繫結。第一個水桶和鎖繫結打完水之後並不會釋放鎖,導致第二個水桶不能和鎖繫結也無法打水。當前執行緒出現死鎖,整個等待佇列中的所有執行緒都無法被喚醒。
下面我們來根據zookeeper實現公平可重入鎖,其原理主要是這樣的:
- Zookeeper的臨時順序節點,會按順序不斷遞增是一個天然的順序發號器
- Zookeeper的節點是遞增且有序的,可以保證鎖的公平。我們只需要加鎖前判斷當前的節點是否是節點中最小的即可,這樣就能保證鎖的公平。
- Zookeeper的節點監聽機制,可以保證鎖的傳遞有序和高效,不像CLH鎖,需要不斷輪詢前一個節點的狀態,耗費資源。(CLH鎖可以見我的顯示鎖學習筆記5.3節)同時還可以避免驚群效應,即一個節點釋放鎖,所有的節點都去搶鎖。
也就是說我們將zookeeper當作鎖的佇列,佇列中最小的就是持有鎖的,其餘節點等待並監聽前一個節點的刪除事件,如果前一個節點被刪除,說明前一個執行緒釋放了鎖,當前執行緒就可以去搶鎖了。
我們下面基於zookeeper實現一個簡單的公平可重入鎖。首先定義一個介面:
public interface Lock {
boolean lock();
boolean unlock();
}
然後編寫一個單例的zookeeper的客戶端:
public class ZkClient {
private static ZkClient zkClient = null;
private CuratorFramework client = null;
public CuratorFramework getClient() {
return client;
}
public void setClient(CuratorFramework client) {
this.client = client;
}
private static final String ZK_ADDRESS = "127.0.0.1:2181";
//餓漢式單例模式,通過synchronized保證執行緒安全
private ZkClient(){}
public static synchronized ZkClient getInstance(){
if (zkClient == null){
zkClient = new ZkClient();
zkClient.init();
}
return zkClient;
}
public void init() {
if (client == null){
//建立客戶端
client = ClientFactory.createSimple(ZK_ADDRESS);
//啟動客戶端例項,連線伺服器
client.start();
}
}
}
然後我們編寫分散式鎖,程式碼如下:
public class ZkLock implements Lock {
//我們分散式鎖使用的節點的路徑
private static final String LOCK_PATH = "/lock/mylock";
//分散式鎖每一個節點的字首
private static final String LOCK_PREFIX = LOCK_PATH + "/";
//當前鎖的序號,這個屬性是物件私有的,因為鎖佇列上每一個節點的狀態是不一樣的
private String lockNumber;
//前一個鎖的序號,這個屬性是物件私有的,因為鎖佇列上每一個節點的狀態是不一樣的
private String lastNumber;
CuratorFramework client = null;
//可重入鎖計數
final AtomicInteger lockCount = new AtomicInteger(0);
Thread thread;
public ZkLock() {
//獲取單例客戶端
ZkClient.getInstance().init();
client = ZkClient.getInstance().getClient();
try {
Stat stat = client.checkExists().forPath(LOCK_PATH);
if (stat == null) {
client.create().creatingParentsIfNeeded().forPath(LOCK_PATH);
}
} catch (Exception e) {
e.printStackTrace();
}
}
//加鎖方法
@Override
public boolean lock() {
//鎖自己,防止出現重入的併發
synchronized (this) {
System.out.println("####當前數量"+lockCount.get());
if (lockCount.get() == 0) {
//第一次搶鎖,設定當前執行緒
thread = Thread.currentThread();
lockCount.incrementAndGet();
} else {
//不是當前執行緒不能重入
if (!thread.equals(Thread.currentThread())) {
return false;
}
//是當前執行緒可以進行重入,直接返回true
lockCount.incrementAndGet();
return true;
}
}
try {
//通過zookeeper進行加鎖
boolean locked = tryLock();
if (locked) {
return true;
}
//如果獲取鎖失敗,則在佇列中等待
while (!locked) {
//呼叫等待方法
await();
//如果結束等待,說明前一個節點被刪除,當前節點被喚醒
//這時在判斷一次自己是否是佇列最小節點,是則加鎖成功
List<String> waiters = getWaiters();
if (checkLocked(waiters)) {
locked = true;
}
}
return true;
} catch (Exception e) {
lockCount.decrementAndGet();
e.printStackTrace();
}
return false;
}
//解鎖方法
@Override
public boolean unlock() {
//不是當前執行緒不能進行解鎖
if (!thread.equals(Thread.currentThread())) {
return false;
}
lockCount.decrementAndGet();
if (lockCount.get() < 0) {
throw new RuntimeException("當前並未上鎖,無法釋放鎖");
}
if (lockCount.get() != 0) {
return true;
}
try {
Stat stat = client.checkExists().forPath(LOCK_PREFIX + lockNumber);
if (stat != null) {
//刪除自己的節點,表示當前持有的鎖已釋放
client.delete().forPath(LOCK_PREFIX + lockNumber);
}
} catch (Exception e) {
e.printStackTrace();
return false;
}
return true;
}
//嘗試加鎖
private boolean tryLock() throws Exception {
//建立一個臨時順序節點
String forPath = client.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(LOCK_PREFIX);
//獲取此節點的zookeeper生成的序號
lockNumber = forPath.substring(LOCK_PREFIX.length());
//獲取佇列中所有節點,判斷自己是否是最小的,是則加鎖成功。
List<String> waiters = getWaiters();
if (checkLocked(waiters)) {
return true;
}
//搜尋自己在佇列中的位置
int index = Collections.binarySearch(waiters, lockNumber);
if (index < 0) {
throw new RuntimeException("發生異常,找不到當前節點");
}
//儲存自己的前一個節點的序號
lastNumber = LOCK_PREFIX + waiters.get(index - 1);
return false;
}
//檢查自己是不是最小的節點
private boolean checkLocked(List<String> waiters) {
Collections.sort(waiters);
return lockNumber.equals(waiters.get(0));
}
//獲取等待佇列
private List<String> getWaiters() throws Exception {
return client.getChildren().forPath(LOCK_PATH);
}
//等待方法
private void await() throws Exception {
if (lastNumber == null) {
throw new RuntimeException("前一節點為空");
}
//利用CountDownLatch進行等待
CountDownLatch latch = new CountDownLatch(1);
//當前節點監聽前一個節點的事件,如果前一個結點被刪除,則結束等待
client.getData().usingWatcher((Watcher) event -> {
System.out.println("####監聽到變化,event=" + event);
latch.countDown();
}).forPath(lastNumber);
System.out.println("####" + Thread.currentThread().getName() + "===>" + "進入了await" + "當前是" + lockNumber + "監聽了" + lastNumber);
latch.await();
System.out.println("####" + Thread.currentThread().getName() + "===>" + "解除了await");
}
}
然後我們編寫一個測試用例:
public class TestZkLock {
int count=0;
@Test
public void testZkLock() throws InterruptedException {
ExecutorService service = Executors.newFixedThreadPool(10);
for (int i = 0; i < 10; i++) {
service.execute(() -> {
ZkLock zkLock = new ZkLock();
zkLock.lock();
for (int j = 0; j < 10; j++) {
count++;
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
zkLock.unlock();
System.out.println("####"+count);
});
}
Thread.sleep(Integer.MAX_VALUE);
}
}
4.2 Curator的分散式鎖
當然實際開發中不建議自己造輪子,而是使用現成的元件。Curator客戶端中提供了各種分散式鎖的實現,這裡以InterProcessMutex可重入鎖為例學習其使用方法:
@Test
public void testCuratorLock() throws InterruptedException {
CuratorFramework client = ZkClient.getInstance().getClient();
InterProcessMutex interProcessMutex = new InterProcessMutex(client, "/mutex");
ExecutorService service = Executors.newFixedThreadPool(10);
for (int i = 0; i < 10; i++) {
service.execute(() -> {
//獲取互斥鎖
try {
interProcessMutex.acquire();
for (int j = 0; j < 10; j++) {
count++;
}
Thread.sleep(1000);
//釋放互斥鎖
interProcessMutex.release();
System.out.println("####"+count);
} catch (Exception e) {
e.printStackTrace();
}
});
}
Thread.sleep(Integer.MAX_VALUE);
}
4.3 分散式鎖的優缺點
Zookeeper的分散式鎖的優點是有效的解決分散式問題和不可重入問題,使用簡單。缺點是效能不高。因為每次建立和釋放鎖都要動態建立銷燬瞬時節點,這樣頻繁的的網路通訊效能是很低的。
目前的分散式鎖的方案中主要有兩種:
- 基於Redis的分散式鎖:適用於併發高,效能要求高,可靠性可以通過其他方式彌補。
- 基於Zookeeper的分散式鎖:適用於高可靠且併發不是很高的場景。