zookeeper 分布式鎖
阿新 • • 發佈:2017-08-23
zookeeper 分布式鎖
分布式鎖有很多,redis也可以實現分布式鎖,
http://shangdc.blog.51cto.com/10093778/1914852(查看redis的分布式鎖)
zookeeper分布式鎖步驟:
1、zookeeper是一個帶有節點的,類似於文件目錄,所以我們把鎖抽象成目錄,zookeeper有一個EPHEMERAL_SEQUENTIAL類型的節點, 多個線程再zookeeper創建的節點的時候,它會幫我們安排好順序進行創建,所以這個節點下的目錄都是順序的。
2、獲取當前目錄的最小的節點,判斷最小節點是不是當前的自己的節點,如果是說明獲取鎖成功了,如果不是獲取鎖失敗了。
3、當獲取鎖的時候失敗了,為了避免驚群效應,你要做的就是獲取當前自己的節點的上一個節點,然後對該節點進行監聽,當上一個節點刪除的時候,會觸發這個監聽,通知該節點。
4、這麽做,釋放鎖的時候,也會通知下一個節點。
什麽是驚群效應:理解為肉少狼多,當一個節點刪除的時候,凡是訂閱了此節點的watcha的監聽都會重新獲取鎖,都要去爭奪,如果數量少還好,當數量很大的時候這種設計就是不合理也是浪費資源。
zookeeper的狀態和事件類型,提前了解一下。
狀態 KeeperState.Disconnected (0) 斷開 * KeeperState.SyncConnected (3) 同步連接狀態 * KeeperState.AuthFailed (4) 認證失敗狀態 * KeeperState.ConnectedReadOnly (5) 只讀連接狀態 * KeeperState.SaslAuthenticated (6) SASL認證通過狀態 * KeeperState.Expired (-112) 過期狀態 * * // EventType 是事件類型 主要關註 Create Delete DataChanged ChildrenChanged * EventType.None (-1), 無 * EventType.NodeCreated (1), * EventType.NodeDeleted (2), * EventType.NodeDataChanged (3), 結點數據變化 * EventType.NodeChildrenChanged (4); 結點子節點變化
下面是代碼,自己敲下,理解一下。
package com.lhcis.spider.system.annotation; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * @author sdc * */ public class ZooDistributeLock implements Watcher { private static final Logger LOG = LoggerFactory.getLogger(ZooDistributeLock.class); private static final String LOCK_PATH = "/zkLock"; // 模擬開啟的線程數 private static final int THREAD_NUM = 5; // 用於等待所有線程都連接成功後再執行任務 private static CountDownLatch startFlag = new CountDownLatch(1); // 用於確保所有線程執行完畢 private static CountDownLatch threadFlag = new CountDownLatch(THREAD_NUM); private ZooKeeper zk = null; private String currentPath; private String lockPath; public static void main(String[] args) { for (int i = 0; i < THREAD_NUM; i++) { final int j = i; new Thread() { @Override public void run() { ZooDistributeLock zooDistributeLock = new ZooDistributeLock(); try { zooDistributeLock.connection(); System.out.println("連接" + j); zooDistributeLock.createNode(); System.out.println("創建" + j); zooDistributeLock.getLock(); System.out.println("獲取鎖" + j); } catch (IOException | InterruptedException | KeeperException e) { e.printStackTrace(); } } }.start(); } try { threadFlag.await(); LOG.info("所有線程執行完畢..."); } catch (InterruptedException e) { LOG.error(e.getMessage(), e); } } /** * Disconnected為網絡閃斷時觸發的事件,當然其他的拔掉網線、kill zookeeper server ,kill zk * connection也會觸發該事件。 SyncConnected為client端重新選擇下一個zk * server連接觸發的事件,此時watcher有效,也就是能正常感知 * Expired為客戶端重新連server時,服務端發現該session超過了設定的時長,返回給client * Expired,此時watcher失效,也就是不能正常感知 */ @Override public void process(WatchedEvent event) { Event.KeeperState state = event.getState(); Event.EventType type = event.getType(); if (Event.KeeperState.SyncConnected == state) { if (Event.EventType.None == type) { // 標識連接成功 LOG.info("成功連接上ZK服務器"); startFlag.countDown(); } if (Event.EventType.NodeDeleted == type && event.getPath().equals(this.lockPath)) { LOG.info("node:" + this.lockPath + "的鎖已經被釋放"); try { // 上一個節點釋放了,當前節點去獲取鎖 getLock(); } catch (KeeperException | InterruptedException e) { LOG.error(e.getMessage(), e); } } } } /** * 連接到 ZK * * @throws IOException */ private void connection() throws IOException, InterruptedException { zk = new ZooKeeper("127.0.0.1:2181", 5000, this); // 等待連接成功後再執行下一步操作 startFlag.await(); } // 創建節點,並初始化當前路徑 private void createNode() throws KeeperException, InterruptedException, UnsupportedEncodingException { this.currentPath = this.zk.create(LOCK_PATH, "".getBytes("UTF-8"), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); } private void getLock() throws KeeperException, InterruptedException { if (minNode()) { doSomething(); // 釋放鎖 releaseLock(); } } /** * 當前是否為最小節點 * * @return */ private boolean minNode() { // 當前序號 try { initLockPath(); // 判斷前一個節點存在不存在,如果存在,則表示當前節點不是最小節點 // zk.getData(this.lockPath, this, new Stat()); zk.getData(this.lockPath, true, new Stat()); LOG.info(this.currentPath + " 不是最小值,沒有獲取鎖,等待 " + this.lockPath + " 釋放鎖"); return false; } catch (KeeperException e) { LOG.info(this.currentPath + " 是最小值,獲得鎖"); return true; } catch (InterruptedException e) { LOG.error(e.getMessage(), e); } return true; } private void doSomething() { LOG.info("處理業務邏輯..."); } /** * 釋放鎖並關閉連接 * * @throws KeeperException * @throws InterruptedException */ private void releaseLock() throws KeeperException, InterruptedException { Thread.sleep(2000); if (this.zk != null) { LOG.info(this.currentPath + " 業務處理完畢,釋放鎖..."); zk.delete(this.currentPath, -1); this.zk.close(); LOG.info(Thread.currentThread().getName() + "關閉 zookeeper 連接"); } threadFlag.countDown(); } /** * 初始化 lockpath */ private void initLockPath() { int currentSeq = Integer.parseInt(this.currentPath.substring(LOCK_PATH.length())); // 上一個序號 int preSeq = currentSeq - 1; String preSeqStr = String.valueOf(preSeq); while (preSeqStr.length() < 10) { preSeqStr = "0" + preSeqStr; } this.lockPath = LOCK_PATH + preSeqStr; } }
參考代碼:
https://juejin.im/entry/596438bc6fb9a06bb47495f1
本文出自 “不積跬步無以至千裏” 博客,請務必保留此出處http://shangdc.blog.51cto.com/10093778/1958619
zookeeper 分布式鎖