Dubbo教程-02-zookeeper簡介,一些API,分散式鎖
寫在前面
hello 大家好
我是御風
歡迎大家來到我的
Dubbo系列教程第2課
在dubbo的使用過程中
通常我們都會選擇zookeeper來作為 註冊中心
本次課我將為大家介紹Zookeeper以及使用示範
閱讀原文 :https://blog.bywind.cn/articles/2018/11/22/1542865223734.html
本課原始碼 : https://github.com/ibywind/dubbo-learn
zookeeper介紹
我們需要下載 zk
目前官網的一個穩定版本 zookeeper-3.4.12.tar.gz
http://mirrors.hust.edu.cn/apache/zookeeper/stable/
ZooKeeper是一個分散式的,開放原始碼的分散式應用程式協調服務,是Google的Chubby一個開源的實現,是Hadoop和Hbase的重要元件。它是一個為分散式應用提供一致性服務的軟體,提供的功能包括:配置維護、域名服務、分散式同步、組服務等。
zookeeper 可以保證最終資料一致性
zk 例項 可以主從複製 並且 效率很高 可靠性也高
結構化儲存
類似模擬檔案系統的一種儲存方式
因為分散式的資料最終一致性
和結構化持久儲存
以及優越的讀寫效能
我們通常會利用 zk 做下面這幾件事情
1.命名服務
在zookeeper的檔案系統裡建立一個目錄,即有唯一的path。在我們使用tborg無法確定上游程式的部署機器時即可與下游程式約定好path,通過path即能互相探索發現。
2.配置管理
程式總是需要配置的,如果程式分散部署在多臺機器上,要逐個改變配置就變得困難。現在把這些配置全部放到zookeeper上去,儲存在 Zookeeper 的某個目錄節點中,然後所有相關應用程式對這個目錄節點進行監聽,一旦配置資訊發生變化,每個應用程式就會收到 Zookeeper 的通知,然後從 Zookeeper 獲取新的配置資訊應用到系統中就好
3.叢集管理
所謂叢集管理無在乎兩點:是否有機器退出和加入、選舉master。
對於第一點,所有機器約定在父目錄GroupMembers下建立臨時目錄節點,然後監聽父目錄節點的子節點變化訊息。一旦有機器掛掉,該機器與 zookeeper的連線斷開,其所建立的臨時目錄節點被刪除,所有其他機器都收到通知:某個兄弟目錄被刪除,於是,所有人都知道:它上船了。
新機器加入也是類似,所有機器收到通知:新兄弟目錄加入,highcount又有了,對於第二點,我們稍微改變一下,所有機器建立臨時順序編號目錄節點,每次選取編號最小的機器作為master就好。
**4.分散式鎖 **
有了zookeeper的一致性檔案系統,鎖的問題變得容易。鎖服務可以分為兩類,一個是保持獨佔,另一個是控制時序。
對於第一類,我們將zookeeper上的一個znode看作是一把鎖,通過createznode的方式來實現。所有客戶端都去建立 /distribute_lock 節點,最終成功建立的那個客戶端也即擁有了這把鎖。用完刪除掉自己建立的distribute_lock 節點就釋放出鎖。
對於第二類, /distribute_lock 已經預先存在,所有客戶端在它下面建立臨時順序編號目錄節點,和選master一樣,編號最小的獲得鎖,用完刪除,依次方便。
5.佇列管理
兩種型別的佇列:
1、同步佇列,當一個佇列的成員都聚齊時,這個佇列才可用,否則一直等待所有成員到達。
2、佇列按照 FIFO 方式進行入隊和出隊操作。
第一類,在約定目錄下建立臨時目錄節點,監聽節點數目是否是我們要求的數目。
第二類,和分散式鎖服務中的控制時序場景基本原理一致,入列有編號,出列按編號。
zk角色介紹
本地安裝
首先去我上面的地址下載zk
然後解壓 放到一個 資料夾中
進入 zk資料夾 基本這個樣子
我們 需要做一個 重新命名 在我們 執行之前
把你看到的 那個 .cfg 檔案 改成 我這樣
然後 回到 bin 目錄 啟動
接下來我們會利用分散式鎖 這個例子 演示 zk的 JAVA API
ZkDemo.java
package cn.bywind;
import org.apache.zookeeper.*;
public class ZkDemo implements Watcher {
public static void main(String[] args) throws Exception {
ZooKeeper zooKeeper = new ZooKeeper("127.0.0.1:2181",3000*10,new ZkDemo());
ZooKeeper.States state = zooKeeper.getState();
System.out.println(state);
zooKeeper.exists("/data",new ZkDemo());
zooKeeper.create("/data","bywind".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
Thread.sleep(2000);
}
@Override
public void process(WatchedEvent event) {
System.out.println("watcher:"+event.getState());
if (event.getType().equals(Event.EventType.NodeCreated)){
System.out.println("watcher:node created");
}
}
}
HADemo.java 看ZK如何實現HA
package cn.bywind;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
public class HADemo implements Watcher {
private String role = "master";
private static final String HA_PATH = "/HA";
private ZooKeeper zooKeeper;
public static void main(String[] args) throws Exception {
HADemo haDemo = new HADemo();
ZooKeeper zk = haDemo.getZk();
Stat exists = zk.exists(HA_PATH, haDemo);
if (exists == null) {
zk.create(HA_PATH, "1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
} else {
haDemo.role = "slave";
System.out.println("i am the :" + haDemo.role);
}
while (true) {
if ("q".equalsIgnoreCase(readFromConsole())) {
System.exit(0);
} else {
System.out.println("輸入q停止當前程式");
}
}
}
private ZooKeeper getZk () {
try {
zooKeeper = new ZooKeeper("127.0.0.1:2181", 3000, null);
return zooKeeper;
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
@Override
public void process(WatchedEvent event) {
if (event.getType().equals(Event.EventType.NodeCreated)) {
System.out.println("I am the :" + role);
}
if (event.getType().equals(Event.EventType.NodeDeleted)) {
System.out.println("master is down");
this.role = "master";
try {
zooKeeper.create(HA_PATH, "1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
System.out.println("I am the :" + role);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static String readFromConsole() {
BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
try {
return reader.readLine();
} catch (Exception e) {
e.printStackTrace();
return "";
}
}
}
客戶端通過事件回撥(觀察者模式)
實現對 zk 服務端的 心跳監控
事件同步
分散式鎖
我會按照兩種方式 給大家建立分散式鎖d
-
zk 原生模式
-
第三方框架 Curator
首先來看 原生 硬編碼 模式吧
我們和 zk 打交道 肯定是需要 引入對應的依賴的
接下來就是 程式設計模型了
首先你的類 需要實現 Watcher
我來看他的 具體原始碼
然後就是 具體的編碼過程了
為了給大更好的演示
我們首先來回顧一下傳統JAVA中的鎖機制
LocalLockDemo.java
package cn.bywind;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
public class LocalLockDemo {
static int n = 500;
public static void secskill() {
--n;
}
public static void main(String[] args) {
final ReentrantLock lock = new ReentrantLock(true);
Runnable runnable = new Runnable() {
@Override
public void run() {
try {
boolean b = lock.tryLock(2, TimeUnit.SECONDS);
if (b){
String name = Thread.currentThread().getName();
secskill();
System.out.println("name:"+name+"--"+n);
}
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
lock.unlock();
}
}
};
Runnable runnable2 = new Runnable() {
@Override
public void run() {
String name = Thread.currentThread().getName();
secskill();
System.out.println("name:"+name+"--"+n);
}
};
for (int i = 0; i < 20; i++) {
new Thread(runnable).start();
}
}
}
根據上面的 鎖 機制
我們嘗試使用zookeeper 來實現
他的 tryLock lock 等方法
DistributedLock.java
package cn.bywind;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
public class DistributedLock implements Lock, Watcher {
private ZooKeeper zk = null;
// 根節點
private String ROOT_LOCK = "/locks";
// 競爭的資源
private String lockName;
// 等待的前一個鎖
private String WAIT_LOCK;
// 當前鎖
private String CURRENT_LOCK;
// 計數器
private CountDownLatch countDownLatch;
private int sessionTimeout = 30000;
private List<Exception> exceptionList = new ArrayList<Exception>();
/**
* 配置分散式鎖
* @param config 連線的url
* @param lockName 競爭資源
*/
public DistributedLock(String config, String lockName) {
this.lockName = lockName;
try {
// 連線zookeeper
zk = new ZooKeeper(config, sessionTimeout, this);
Stat stat = zk.exists(ROOT_LOCK, false);
if (stat == null) {
// 如果根節點不存在,則建立根節點
zk.create(ROOT_LOCK, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
// 節點監視器
public void process(WatchedEvent event) {
if (this.countDownLatch != null && event.getType()== Event.EventType.NodeDeleted) {
this.countDownLatch.countDown();
}
}
public void lock() {
if (exceptionList.size() > 0) {
throw new LockException(exceptionList.get(0));
}
try {
if (this.tryLock()) {
System.out.println(Thread.currentThread().getName() + " " + lockName + "獲得了鎖");
return;
} else {
// 等待鎖
waitForLock(WAIT_LOCK, sessionTimeout);
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
public boolean tryLock() {
try {
String splitStr = "_lock_";
if (lockName.contains(splitStr)) {
throw new LockException("鎖名有誤");
}
// 建立臨時有序節點
CURRENT_LOCK = zk.create(ROOT_LOCK + "/" + lockName + splitStr, new byte[0],
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(CURRENT_LOCK + " 已經建立");
// 取所有子節點
List<String> subNodes = zk.getChildren(ROOT_LOCK, false);
// 取出所有lockName的鎖
List<String> lockObjects = new ArrayList<String>();
for (String node : subNodes) {
String _node = node.split(splitStr)[0];
if (_node.equals(lockName)) {
lockObjects.add(node);
}
}
Collections.sort(lockObjects);
System.out.println(Thread.currentThread().getName() + " 的鎖是 " + CURRENT_LOCK);
// 若當前節點為最小節點,則獲取鎖成功
if (CURRENT_LOCK.equals(ROOT_LOCK + "/" + lockObjects.get(0))) {
return true;
}
// 若不是最小節點,則找到自己的前一個節點
String prevNode = CURRENT_LOCK.substring(CURRENT_LOCK.lastIndexOf("/") + 1);
WAIT_LOCK = lockObjects.get(Collections.binarySearch(lockObjects, prevNode) - 1);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
return false;
}
public boolean tryLock(long timeout, TimeUnit unit) {
try {
if (this.tryLock()) {
return true;
}
return waitForLock(WAIT_LOCK, timeout);
} catch (Exception e) {
e.printStackTrace();
}
return false;
}
// 等待鎖
private boolean waitForLock(String prev, long waitTime) throws KeeperException, InterruptedException {
Stat stat = zk.exists(ROOT_LOCK + "/" + prev, true);
if (stat != null) {
System.out.println(Thread.currentThread().getName() + "等待鎖 " + ROOT_LOCK + "/" + prev);
this.countDownLatch = new CountDownLatch(1);
// 計數等待,若等到前一個節點消失,則precess中進行countDown,停止等待,獲取鎖
this.countDownLatch.await(waitTime, TimeUnit.MILLISECONDS);
this.countDownLatch = null;
System.out.println(Thread.currentThread().getName() + " 等到了鎖");
}
return true;
}
public void unlock() {
try {
System.out.println("釋放鎖 " + CURRENT_LOCK);
zk.delete(CURRENT_LOCK, -1);
CURRENT_LOCK = null;
zk.close();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
public Condition newCondition() {
return null;
}
public void lockInterruptibly() throws InterruptedException {
this.lock();
}
public class LockException extends RuntimeException {
private static final long serialVersionUID = 1L;
public LockException(String e){
super(e);
}
public LockException(Exception e){
super(e);
}
}
}
zk 的分散式鎖實現
實現 lock 介面,把本地的實現 換成 zk znode 的讀寫實現
依靠 zk的優異讀寫效能 及 分散式協調能力
App.java
package cn.bywind;
/**
* Hello world!
*
*/
public class App
{
static int n = 500;
public static void secskill() {
System.out.println(--n);
}
public static void main(String[] args) {
Runnable runnable = new Runnable() {
public void run() {
DistributedLock lock = null;
try {
lock = new DistributedLock("127.0.0.1:2181", "testZk");
lock.lock();
secskill();
System.out.println(Thread.currentThread().getName() + "正在執行");
} finally {
if (lock != null) {
lock.unlock();
}
}
}
};
for (int i = 0; i < 10; i++) {
Thread t = new Thread(runnable);
t.start();
}
}
}
程式碼跑起來看看
執行緒 獲取鎖 釋放鎖
執行有序
大家可以看我的 視訊教程
總體的一個 思路就是
我們現在把 一個公共的 爭奪的 資源 放到了第三方 zk 哪裡
這個資源 就是一個 類似檔案目錄的東西 (你不可能在檔案目錄項建立兩個一樣名字的資源)
所以保證了唯一性
然後 你 來了 我給你一個 零時資源 讓你 排隊 。 (利用 零時 遞增 儲存方式)
然後 前一個人 釋放了 鎖
我就來看誰排在 他後面咯
然後 就順序給你就好了 。
我畫了個路程圖
接下來 我們開下 如何使用 第三方框架 實現
Curator
是Netflix公司開源的一個Zookeeper客戶端,與Zookeeper提供的原生客戶端相比,Curator的抽象層次更高,簡化了Zookeeper客戶端的開發量。
具體程式碼實現
首先我們來看一下 curator 操作 zk 的一些API
這裡演示了 增刪改查等操作
感覺確實比zk 的原生API簡單 不少
並且設計很優秀的
大家可以看下程式碼
CuratorTest.java
package cn.bywind;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
import org.apache.zookeeper.CreateMode;
import org.junit.Test;
public class CuratorTest {
private static final String ZK_ADDRESS = "127.0.0.1:2181";
private static final String ZK_PATH = "/zktest";
@Test
public void testZk() throws Exception {
CuratorFramework client = CuratorFrameworkFactory.newClient(
ZK_ADDRESS,
new RetryNTimes(10, 5000)
);
client.start();
System.out.println("zk client start successfully!");
client.delete().deletingChildrenIfNeeded().withVersion(-1).forPath(ZK_PATH);
/** 2.Client API test*/
// 2.1 Create node
String data1 = "hello";
print("create", ZK_PATH, data1);
client.create().
creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).
forPath(ZK_PATH, data1.getBytes());
// 2.2 Get node and data
print("ls", "/");
print(client.getChildren().forPath("/"));
print("get", ZK_PATH);
print(client.getData().forPath(ZK_PATH));
// 2.3 Modify data
String data2 = "world";
print("set", ZK_PATH, data2);
client.setData().forPath(ZK_PATH, data2.getBytes());
print("get", ZK_PATH);
print(client.getData().forPath(ZK_PATH));
// 2.4 Remove node
print("delete", ZK_PATH);
client.delete().forPath(ZK_PATH);
print("ls", "/");
print(client.getChildren().forPath("/"));
}
private static void print(String... cmds) {
StringBuilder text = new StringBuilder("$ ");
for (String cmd : cmds) {
text.append(cmd).append(" ");
}
System.out.println(text.toString());
}
private static void print(Object result) {
System.out.println(
result instanceof byte[]
? new String((byte[]) result)
: result);
}
}
演示效果如下
接下來就是 使用curator 實現 zk的分散式鎖
程式碼在下方
大家可以檢視下
為了方便測試
curator 可以自己模擬 zk 服務端
很方便
如果大家想要嘗試 可以註釋開啟
CuratorDistributedLockDemo.java
package cn.bywind;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.*;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingCluster;
import org.apache.curator.test.TestingServer;
import org.apache.curator.utils.CloseableUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.TimeUnit;
/**
* 分散式鎖演示說明
*/
public class CuratorDistributedLockDemo {
// zookeeper 鎖節點路徑,分散式鎖的相關操作都是在這個節點上進行
private final String lockPath = "/distributed-lock";
// zookeeper 服務地址,單機格式為:(127.0.0.1:2181),叢集格式為:(127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183)
private String connectString;
// Curator 客戶端重試策略
private RetryPolicy retry;
// Curator 客戶端物件
private CuratorFramework client;
// client2 使用者模擬其他客戶端
private CuratorFramework client2;
private TestingServer testingServer;
private TestingCluster testingCluster;
// 記憶體 zookeeper 伺服器(單個),用於測試,也可以選擇使用外部伺服器,如 127.0.0.1:2181
// 同時提供 TestingCluster,用於測試叢集
// 初始化資源
@Before
public void init() throws Exception {
// 建立一個測試 zookeeper 伺服器
//testingServer = new TestingServer();
// 獲取該伺服器的連結地址
//connectString = testingServer.getConnectString();
// 建立一個叢集測試 zookeeper 伺服器
//testingCluster = new TestingCluster(3);
// 獲取該伺服器的連結地址
//connectString = testingCluster.getConnectString();
connectString = "127.0.0.1:2181";
// 重試策略
// 初始休眠時間為 1000ms,最大重試次數為3
retry = new ExponentialBackoffRetry(1000, 3);
// 建立一個客戶端 60000(ms)為 session 超時時間,15000(ms)為連結超時時間
client = CuratorFrameworkFactory.newClient(connectString, 60000, 15000, retry);
client2 = CuratorFrameworkFactory.newClient(connectString, 60000, 15000, retry);
// 建立會話
client.start();
client2.start();
}
// 釋放資源
@After
public void close() {
CloseableUtils.closeQuietly(client);
CloseableUtils.closeQuietly(testingServer);
CloseableUtils.closeQuietly(testingCluster);
}
// 共享鎖
@Test
public void sharedLock() throws Exception {
// 建立共享鎖
InterProcessLock lock = new InterProcessSemaphoreMutex(client, lockPath);
// lock2 用於模擬其他客戶端
InterProcessLock lock2 = new InterProcessSemaphoreMutex(client2, lockPath);
// 獲取鎖物件
lock.acquire();
// 測試是否可以重入
// 超時獲取鎖物件(第一個引數為時間,第二個引數為時間單位),因為鎖已經被獲取,所以返回 false
Assert.assertFalse(lock.acquire(2, TimeUnit.SECONDS));
// 釋放鎖
lock.release();
// lock2 嘗試獲取鎖成功,因為鎖已經被釋放
Assert.assertTrue(lock2.acquire(2, TimeUnit.SECONDS));
lock2.release();
}
// 重入鎖
@Test
public void sharedReentrantLock() throws Exception {
// 建立可重入鎖
InterProcessLock lock = new InterProcessMutex(client, lockPath);
// lock2 用於模擬其他客戶端
InterProcessLock lock2 = new InterProcessMutex(client2, lockPath);
// lock 獲取鎖
lock.acquire();
try {
// lock 第二次獲取鎖
lock.acquire();
try {
// lock2 超時獲取鎖,因為鎖已經被 lock 客戶端佔用,所以獲取失敗,需要等 lock 釋放
Assert.assertFalse(lock2.acquire(2, TimeUnit.SECONDS));
} finally {
lock.release();
}
} finally {
// 重入鎖獲取與釋放需要一一對應,如果獲取2次,釋放1次,那麼該鎖依然是被佔用,如果將下面這行程式碼註釋,那麼會發現下面的 lock2 獲取鎖失敗
lock.release();
}
// 在 lock 釋放後,lock2 能夠獲取鎖
Assert.assertTrue(lock2.acquire(2, TimeUnit.SECONDS));
lock2.release();
}
// 讀寫鎖
@Test
public void sharedReentrantReadWriteLock() throws Exception {
// 建立讀寫鎖物件,因 curator 的實現原理,該鎖是公平的
InterProcessReadWriteLock lock = new InterProcessReadWriteLock(client, lockPath);
// lock2 用於模擬其他客戶端
InterProcessReadWriteLock lock2 = new InterProcessReadWriteLock(client2, lockPath);
// 使用 lock 模擬讀操作
// 使用 lock2 模擬寫操作
// 獲取讀鎖(使用InterProcessMutex實現,所以是可以重入的)
final InterProcessLock readLock = lock.readLock();
// 獲取寫鎖(使用InterProcessMutex實現,所以是可以重入的)
final InterProcessLock writeLock = lock2.writeLock();
/**
* 讀寫鎖測試物件
*/
class ReadWriteLockTest {
// 測試資料變更欄位
private Integer testData = 0;
private Set<Thread> threadSet = new HashSet<Thread>();
// 寫入資料
private void write() throws Exception {
writeLock.acquire();
try {
Thread.sleep(10);
testData++;
System.out.println("寫入資料\t" + testData);
} finally {
writeLock.release();
}
}
// 讀取資料
private void read() throws Exception {
readLock.acquire();
try {
Thread.sleep(10);
System.out.println("讀取資料\t" + testData);
} finally {
readLock.release();
}
}
// 等待執行緒結束,防止test方法呼叫完成後,當前執行緒直接退出,導致控制檯無法輸出資訊
public void waitThread() throws InterruptedException {
for (Thread thread : threadSet) {
thread.join();
}
}
// 建立執行緒方法
private void createThread(final int type) {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
try {
if (type == 1) {
write();
} else {
read();
}
} catch (Exception e) {
e.printStackTrace();
}
}
});
threadSet.add(thread);
thread.start();
}
// 測試方法
public void test() {
for (int i = 0; i < 5; i++) {
createThread(1);
}
for (int i = 0; i < 5; i++) {
createThread(2);
}
}
}
ReadWriteLockTest readWriteLockTest = new ReadWriteLockTest();
readWriteLockTest.test();
readWriteLockTest.waitThread();
}
// 訊號量
@Test
public void semaphore() throws Exception {
// 建立一個訊號量
InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, lockPath, 6);
// semaphore2 用於模擬其他客戶端
InterProcessSemaphoreV2 semaphore2 = new InterProcessSemaphoreV2(client2, lockPath, 6);
// 獲取一個許可
Lease lease = semaphore.acquire();
Assert.assertNotNull(lease);
// semaphore.getParticipantNodes() 會返回當前參與訊號量的節點列表,倆個客戶端所獲取的資訊相同
Assert.assertEquals(semaphore.getParticipantNodes(), semaphore2.getParticipantNodes());
// 超時獲取一個許可
Lease lease2 = semaphore2.acquire(2, TimeUnit.SECONDS);
Assert.assertNotNull(lease2);
Assert.assertEquals(semaphore.getParticipantNodes(), semaphore2.getParticipantNodes());
// 獲取多個許可,引數為許可數量
Collection<Lease> leases = semaphore.acquire(2);
Assert.assertTrue(leases.size() == 2);
Assert.assertEquals(semaphore.getParticipantNodes(), semaphore2.getParticipantNodes());
// 超時獲取多個許可,第一個引數為許可數量
Collection<Lease> leases2 = semaphore2.acquire(2, 2, TimeUnit.SECONDS);
Assert.assertTrue(leases2.size() == 2);
Assert.assertEquals(semaphore.getParticipantNodes(), semaphore2.getParticipantNodes());
// 目前 semaphore 已經獲取 3 個許可,semaphore2 也獲取 3 個許可,加起來為 6 個,所以他們無法在進行許可獲取
// 無法獲取許可
Assert.assertNull(semaphore.acquire(2, TimeUnit.SECONDS));
Assert.assertNull(semaphore2.acquire(2, TimeUnit.SECONDS));
semaphore.returnLease(lease);
semaphore2.returnLease(lease2);
semaphore.returnAll(leases);
semaphore2.returnAll(leases2);
}
// 多重鎖
@Test
public void multiLock() throws Exception {
// 可重入鎖
InterProcessLock interProcessLock1 = new InterProcessMutex(client, lockPath);
// 不可重入鎖
InterProcessLock interProcessLock2 = new InterProcessSemaphoreMutex(client2, lockPath);
// 建立多重鎖物件
InterProcessLock lock = new InterProcessMultiLock(Arrays.asList(interProcessLock1, interProcessLock2));
// 獲取引數集合中的所有鎖
lock.acquire();
// 因為存在一個不可重入鎖,所以整個 InterProcessMultiLock 不可重入
Assert.assertFalse(lock.acquire(2, TimeUnit.SECONDS));
// interProcessLock1 是可重入鎖,所以可以繼續獲取鎖
Assert.assertTrue(interProcessLock1.acquire(2, TimeUnit.SECONDS));
// interProcessLock2 是不可重入鎖,所以獲取鎖失敗
Assert.assertFalse(interProcessLock2.acquire(2, TimeUnit.SECONDS));
// 釋放參數集合中的所有鎖
lock.release();
// interProcessLock2 中的所已經釋放,所以可以獲取
Assert.assertTrue(interProcessLock2.acquire(2, TimeUnit.SECONDS));
}
}
另外如果大家想更直觀瞭解ZK分散式鎖的話
可以參看我的視訊
總結
zookeeper在實際的開發過程中
用的地方非常多
比如 Hadoop hbase spark 等大資料庫框架
顯然 zk 已經是 分散式 大資料 時代
繞不過去的一道坎了
本次課講到的 分散式鎖 只是 他眾多功能的一個
其實 分散式鎖的 實現方案 有很多的
比如 redis 也是可以的 而且 速度 效能要比 zk 快
只是借這次課的一個機會 和大家一起重新溫習下 zk 的原理和使用