zookeeper(五) curator 鎖機制
分散式鎖的應用
分散式鎖服務宕機, ZooKeeper 一般是以叢集部署, 如果出現 ZooKeeper 宕機, 那麼只要當前正常的伺服器超過叢集的半數, 依然可以正常提供服務
持有鎖資源伺服器宕機, 假如一臺伺服器獲取鎖之後就宕機了, 那麼就會導致其他伺服器無法再獲取該鎖. 就會造成 死鎖 問題, 在 Curator 中, 鎖的資訊都是儲存在臨時節點上, 如果持有鎖資源的伺服器宕機, 那麼 ZooKeeper 就會移除它的資訊, 這時其他伺服器就能進行獲取鎖操作。
當然了分散式鎖還可以基於redis實現,其string型別的 setnx key value命令 結合expire命令。
前面的準備工作:
package Lock; import java.util.concurrent.TimeUnit; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.locks.InterProcessLock; import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.utils.CloseableUtils; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; public class DistributedLockDemo { // ZooKeeper 鎖節點路徑, 分散式鎖的相關操作都是在這個節點上進行 private final String lockPath = "/distributed-lock"; // ZooKeeper 服務地址, 單機格式為:(127.0.0.1:2181), // 叢集格式為:(127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183) private String connectString; // Curator 客戶端重試策略 private RetryPolicy retry; // Curator 客戶端物件 private CuratorFramework client; // client2 使用者模擬其他客戶端 private CuratorFramework client2; // 初始化資源 @Before public void init() throws Exception { // 設定 ZooKeeper 服務地址為本機的 2181 埠 connectString = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183"; // 重試策略 // 初始休眠時間為 1000ms, 最大重試次數為 3 retry = new ExponentialBackoffRetry(1000, 3); // 建立一個客戶端, 60000(ms)為 session 超時時間, 15000(ms)為連結超時時間 client = CuratorFrameworkFactory.newClient(connectString, 60000, 15000, retry); client2 = CuratorFrameworkFactory.newClient(connectString, 60000, 15000, retry); // 建立會話 client.start(); client2.start(); } // 釋放資源 @After public void close() { CloseableUtils.closeQuietly(client); } }
zookeper的實現主要有下面四類類:
InterProcessMutex:分散式可重入排它鎖
InterProcessSemaphoreMutex:分散式排它鎖
InterProcessReadWriteLock:分散式讀寫鎖
InterProcessMultiLock:將多個鎖作為單個實體管理的容器
1.共享鎖,不可重入--- InterProcessSemaphoreMutex
InterProcessSemaphoreMutex是一種不可重入的互斥鎖,也就意味著即使是同一個執行緒也無法在持有鎖的情況下再次獲得鎖,所以需要注意,不可重入的鎖很容易在一些情況導致死鎖。
@Test public void sharedLock() throws Exception { // 建立共享鎖 final InterProcessLock lock = new InterProcessSemaphoreMutex(client, lockPath); // lock2 用於模擬其他客戶端 final InterProcessLock lock2 = new InterProcessSemaphoreMutex(client2, lockPath); new Thread(new Runnable() { @Override public void run() { // 獲取鎖物件 try { lock.acquire(); System.out.println("1獲取鎖==============="); // 測試鎖重入 Thread.sleep(5 * 1000); lock.release(); System.out.println("1釋放鎖==============="); } catch (Exception e) { e.printStackTrace(); } } }).start(); new Thread(new Runnable() { @Override public void run() { // 獲取鎖物件 try { lock2.acquire(); System.out.println("2獲取鎖==============="); Thread.sleep(5 * 1000); lock2.release(); System.out.println("2釋放鎖==============="); } catch (Exception e) { e.printStackTrace(); } } }).start(); Thread.sleep(20 * 1000); }
2.共享可重入鎖--- InterProcessMutex
此鎖可以重入,但是重入幾次需要釋放幾次。
@Test public void sharedReentrantLock() throws Exception { // 建立共享鎖 final InterProcessLock lock = new InterProcessMutex(client, lockPath); // lock2 用於模擬其他客戶端 final InterProcessLock lock2 = new InterProcessMutex(client2, lockPath); final CountDownLatch countDownLatch = new CountDownLatch(2); new Thread(new Runnable() { @Override public void run() { // 獲取鎖物件 try { lock.acquire(); System.out.println("1獲取鎖==============="); // 測試鎖重入 lock.acquire(); System.out.println("1再次獲取鎖==============="); Thread.sleep(5 * 1000); lock.release(); System.out.println("1釋放鎖==============="); lock.release(); System.out.println("1再次釋放鎖==============="); countDownLatch.countDown(); } catch (Exception e) { e.printStackTrace(); } } }).start(); new Thread(new Runnable() { @Override public void run() { // 獲取鎖物件 try { lock2.acquire(); System.out.println("2獲取鎖==============="); // 測試鎖重入 lock2.acquire(); System.out.println("2再次獲取鎖==============="); Thread.sleep(5 * 1000); lock2.release(); System.out.println("2釋放鎖==============="); lock2.release(); System.out.println("2再次釋放鎖==============="); countDownLatch.countDown(); } catch (Exception e) { e.printStackTrace(); } } }).start(); countDownLatch.await(); }
結果:
1獲取鎖===============
1再次獲取鎖===============
1釋放鎖===============
1再次釋放鎖===============
2獲取鎖===============
2再次獲取鎖===============
2釋放鎖===============
2再次釋放鎖===============
原理:
InterProcessMutex通過在zookeeper的某路徑節點下建立臨時序列節點來實現分散式鎖,即每個執行緒(跨程序的執行緒)獲取同一把鎖前,都需要在同樣的路徑下建立一個節點,節點名字由uuid + 遞增序列組成。而通過對比自身的序列數是否在所有子節點的第一位,來判斷是否成功獲取到了鎖。當獲取鎖失敗時,它會新增watcher來監聽前一個節點的變動情況,然後進行等待狀態。直到watcher的事件生效將自己喚醒,或者超時時間異常返回。
3.共享可重入讀寫鎖
讀鎖和讀鎖不互斥,只要有寫鎖就互斥。
@Test public void sharedReentrantReadWriteLock() throws Exception { // 建立共享可重入讀寫鎖 final InterProcessReadWriteLock locl1 = new InterProcessReadWriteLock(client, lockPath); // lock2 用於模擬其他客戶端 final InterProcessReadWriteLock lock2 = new InterProcessReadWriteLock(client2, lockPath); // 獲取讀寫鎖(使用 InterProcessMutex 實現, 所以是可以重入的) final InterProcessLock readLock = locl1.readLock(); final InterProcessLock readLockw = lock2.readLock(); final CountDownLatch countDownLatch = new CountDownLatch(2); new Thread(new Runnable() { @Override public void run() { // 獲取鎖物件 try { readLock.acquire(); System.out.println("1獲取讀鎖==============="); // 測試鎖重入 readLock.acquire(); System.out.println("1再次獲取讀鎖==============="); Thread.sleep(5 * 1000); readLock.release(); System.out.println("1釋放讀鎖==============="); readLock.release(); System.out.println("1再次釋放讀鎖==============="); countDownLatch.countDown(); } catch (Exception e) { e.printStackTrace(); } } }).start(); new Thread(new Runnable() { @Override public void run() { // 獲取鎖物件 try { Thread.sleep(500); readLockw.acquire(); System.out.println("2獲取讀鎖==============="); // 測試鎖重入 readLockw.acquire(); System.out.println("2再次獲取讀鎖=============="); Thread.sleep(5 * 1000); readLockw.release(); System.out.println("2釋放讀鎖==============="); readLockw.release(); System.out.println("2再次釋放讀鎖==============="); countDownLatch.countDown(); } catch (Exception e) { e.printStackTrace(); } } }).start(); countDownLatch.await(); }
結果:
1獲取讀鎖===============
1再次獲取讀鎖===============
2獲取讀鎖===============
2再次獲取讀鎖==============
1釋放讀鎖===============
2釋放讀鎖===============
1再次釋放讀鎖===============
2再次釋放讀鎖===============
4. 共享訊號量
@Test public void semaphore() throws Exception { // 建立一個訊號量, Curator 以公平鎖的方式進行實現 final InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, lockPath, 1); final CountDownLatch countDownLatch = new CountDownLatch(2); new Thread(new Runnable() { @Override public void run() { // 獲取鎖物件 try { // 獲取一個許可 Lease lease = semaphore.acquire(); logger.info("1獲取讀訊號量==============="); Thread.sleep(5 * 1000); semaphore.returnLease(lease); logger.info("1釋放讀訊號量==============="); countDownLatch.countDown(); } catch (Exception e) { e.printStackTrace(); } } }).start(); new Thread(new Runnable() { @Override public void run() { // 獲取鎖物件 try { // 獲取一個許可 Lease lease = semaphore.acquire(); logger.info("2獲取讀訊號量==============="); Thread.sleep(5 * 1000); semaphore.returnLease(lease); logger.info("2釋放讀訊號量==============="); countDownLatch.countDown(); } catch (Exception e) { e.printStackTrace(); } } }).start(); countDownLatch.await(); }
結果:
09:39:26 [Lock.DistributedLockDemo]-[INFO] 2獲取讀訊號量===============
09:39:32 [Lock.DistributedLockDemo]-[INFO] 2釋放讀訊號量===============
09:39:32 [Lock.DistributedLockDemo]-[INFO] 1獲取讀訊號量===============
09:39:37 [Lock.DistributedLockDemo]-[INFO] 1釋放讀訊號量===============
當然可以一次獲取多個訊號量:
@Test public void semaphore() throws Exception { // 建立一個訊號量, Curator 以公平鎖的方式進行實現 final InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, lockPath, 3); final CountDownLatch countDownLatch = new CountDownLatch(2); new Thread(new Runnable() { @Override public void run() { // 獲取鎖物件 try { // 獲取2個許可 Collection<Lease> acquire = semaphore.acquire(2); logger.info("1獲取讀訊號量==============="); Thread.sleep(5 * 1000); semaphore.returnAll(acquire); logger.info("1釋放讀訊號量==============="); countDownLatch.countDown(); } catch (Exception e) { e.printStackTrace(); } } }).start(); new Thread(new Runnable() { @Override public void run() { // 獲取鎖物件 try { // 獲取1個許可 Collection<Lease> acquire = semaphore.acquire(1); logger.info("2獲取讀訊號量==============="); Thread.sleep(5 * 1000); semaphore.returnAll(acquire); logger.info("2釋放讀訊號量==============="); countDownLatch.countDown(); } catch (Exception e) { e.printStackTrace(); } } }).start(); countDownLatch.await(); }
結果:
09:46:53 [Lock.DistributedLockDemo]-[INFO] 2獲取讀訊號量===============
09:46:53 [Lock.DistributedLockDemo]-[INFO] 1獲取讀訊號量===============
09:46:58 [Lock.DistributedLockDemo]-[INFO] 2釋放讀訊號量===============
09:46:58 [Lock.DistributedLockDemo]-[INFO] 1釋放讀訊號量===============
5.多重共享鎖
@Test public void multiLock() throws Exception { // 可重入鎖 final InterProcessLock interProcessLock1 = new InterProcessMutex(client, lockPath); // 不可重入鎖 final InterProcessLock interProcessLock2 = new InterProcessSemaphoreMutex(client2, lockPath); // 建立多重鎖物件 final InterProcessLock lock = new InterProcessMultiLock(Arrays.asList(interProcessLock1, interProcessLock2)); final CountDownLatch countDownLatch = new CountDownLatch(1); new Thread(new Runnable() { @Override public void run() { // 獲取鎖物件 try { // 獲取引數集合中的所有鎖 lock.acquire(); // 因為存在一個不可重入鎖, 所以整個 InterProcessMultiLock 不可重入 System.out.println(lock.acquire(2, TimeUnit.SECONDS)); // interProcessLock1 是可重入鎖, 所以可以繼續獲取鎖 System.out.println(interProcessLock1.acquire(2, TimeUnit.SECONDS)); // interProcessLock2 是不可重入鎖, 所以獲取鎖失敗 System.out.println(interProcessLock2.acquire(2, TimeUnit.SECONDS)); countDownLatch.countDown(); } catch (Exception e) { e.printStackTrace(); } } }).start(); countDownLatch.await(); }
結果:
false
true
false