基於zookeeper簡單實現分布式鎖
主要思想: 1、開啟10個線程。在disLocks節點下各自創建名為sub的EPHEMERAL_SEQUENTIAL節點。 2、獲取disLocks節點下全部子節點,排序,假設自己的節點編號最小,則獲取鎖; 3、否則watch排在自己前面的節點,監聽到其刪除後,進入第2步(又一次檢測排序是防止監聽的節點發生連接失效。導致的節點刪除情況); 4、刪除自身sub節點,釋放連接;
這裏插播下zookeeper的4種節點類型:
public enum CreateMode { /** * 持久節點:節點創建後。會一直存在,不會因client會話失效而刪除; */ PERSISTENT (0, false, false), /** * 持久順序節點:基本特性與持久節點一致。創建節點的過程中。zookeeper會在其名字後自己主動追加一個單調增長的數字後綴,作為新的節點名。 */ PERSISTENT_SEQUENTIAL (2, false, true), /** * 暫時節點:client會話失效或連接關閉後,該節點會被自己主動刪除。且不能再暫時節點以下創建子節點,否則報例如以下錯:org.apache.zookeeper.KeeperException$NoChildrenForEphemeralsException; */ EPHEMERAL (1, true, false), /** * 暫時順序節點:基本特性與暫時節點一致。創建節點的過程中,zookeeper會在其名字後自己主動追加一個單調增長的數字後綴。作為新的節點名; */ EPHEMERAL_SEQUENTIAL (3, true, true); private static final Logger LOG = LoggerFactory.getLogger(CreateMode.class); private boolean ephemeral; private boolean sequential; private int flag; CreateMode(int flag, boolean ephemeral, boolean sequential) { this.flag = flag; this.ephemeral = ephemeral; this.sequential = sequential; } public boolean isEphemeral() { return ephemeral; } public boolean isSequential() { return sequential; } public int toFlag() { return flag; } static public CreateMode fromFlag(int flag) throws KeeperException { switch(flag) { case 0: return CreateMode.PERSISTENT; case 1: return CreateMode.EPHEMERAL; case 2: return CreateMode.PERSISTENT_SEQUENTIAL; case 3: return CreateMode.EPHEMERAL_SEQUENTIAL ; default: LOG.error("Received an invalid flag value to convert to a CreateMode"); throw new KeeperException.BadArgumentsException(); } } }
測試代碼:
package zookeeper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; import java.util.List; import java.io.IOException; import java.util.Collections; import java.util.concurrent.CountDownLatch; public class DistributedLock implements Watcher{ private int threadId; private ZooKeeper zk = null; private String selfPath; private String waitPath; private String LOG_PREFIX_OF_THREAD; private static final int SESSION_TIMEOUT = 10000; private static final String GROUP_PATH = "/disLocks"; private static final String SUB_PATH = "/disLocks/sub"; private static final String CONNECTION_STRING = "192.168.*.*:2181"; private static final int THREAD_NUM = 10; //確保連接zk成功。 private CountDownLatch connectedSemaphore = new CountDownLatch(1); //確保全部線程執行結束; private static final CountDownLatch threadSemaphore = new CountDownLatch(THREAD_NUM); private static final Logger LOG = LoggerFactory.getLogger(AllZooKeeperWatcher.class); public DistributedLock(int id) { this.threadId = id; LOG_PREFIX_OF_THREAD = "【第"+threadId+"個線程】"; } public static void main(String[] args) { for(int i=0; i < THREAD_NUM; i++){ final int threadId = i+1; new Thread(){ @Override public void run() { try{ DistributedLock dc = new DistributedLock(threadId); dc.createConnection(CONNECTION_STRING, SESSION_TIMEOUT); //GROUP_PATH不存在的話,由一個線程創建就可以; synchronized (threadSemaphore){ dc.createPath(GROUP_PATH, "該節點由線程" + threadId + "創建", true); } dc.getLock(); } catch (Exception e){ LOG.error("【第"+threadId+"個線程】 拋出的異常:"); e.printStackTrace(); } } }.start(); } try { threadSemaphore.await(); LOG.info("全部線程執行結束!"); } catch (InterruptedException e) { e.printStackTrace(); } } /** * 獲取鎖 * @return */ private void getLock() throws KeeperException, InterruptedException { selfPath = zk.create(SUB_PATH,null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); LOG.info(LOG_PREFIX_OF_THREAD+"創建鎖路徑:"+selfPath); if(checkMinPath()){ getLockSuccess(); } } /** * 創建節點 * @param path 節點path * @param data 初始數據內容 * @return */ public boolean createPath( String path, String data, boolean needWatch) throws KeeperException, InterruptedException { if(zk.exists(path, needWatch)==null){ LOG.info( LOG_PREFIX_OF_THREAD + "節點創建成功, Path: " + this.zk.create( path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT ) + ", content: " + data ); } return true; } /** * 創建ZK連接 * @param connectString ZKserver地址列表 * @param sessionTimeout Session超時時間 */ public void createConnection( String connectString, int sessionTimeout ) throws IOException, InterruptedException { zk = new ZooKeeper( connectString, sessionTimeout, this); connectedSemaphore.await(); } /** * 獲取鎖成功 */ public void getLockSuccess() throws KeeperException, InterruptedException { if(zk.exists(this.selfPath,false) == null){ LOG.error(LOG_PREFIX_OF_THREAD+"本節點已不在了..."); return; } LOG.info(LOG_PREFIX_OF_THREAD + "獲取鎖成功,趕緊幹活!"); Thread.sleep(2000); LOG.info(LOG_PREFIX_OF_THREAD + "刪除本節點:"+selfPath); zk.delete(this.selfPath, -1); releaseConnection(); threadSemaphore.countDown(); } /** * 關閉ZK連接 */ public void releaseConnection() { if ( this.zk !=null ) { try { this.zk.close(); } catch ( InterruptedException e ) {} } LOG.info(LOG_PREFIX_OF_THREAD + "釋放連接"); } /** * 檢查自己是不是最小的節點 * @return */ public boolean checkMinPath() throws KeeperException, InterruptedException { List<String> subNodes = zk.getChildren(GROUP_PATH, false); Collections.sort(subNodes); int index = subNodes.indexOf( selfPath.substring(GROUP_PATH.length()+1)); switch (index){ case -1:{ LOG.error(LOG_PREFIX_OF_THREAD+"本節點已不在了..."+selfPath); return false; } case 0:{ LOG.info(LOG_PREFIX_OF_THREAD+"子節點中,我果然是老大"+selfPath); return true; } default:{ this.waitPath = GROUP_PATH +"/"+ subNodes.get(index - 1); LOG.info(LOG_PREFIX_OF_THREAD+"獲取子節點中,排在我前面的"+waitPath); try{ zk.getData(waitPath, true, new Stat()); return false; }catch(KeeperException e){ if(zk.exists(waitPath,false) == null){ LOG.info(LOG_PREFIX_OF_THREAD+"子節點中,排在我前面的"+waitPath+"已失蹤,幸福來得太突然?"); return checkMinPath(); }else{ throw e; } } } } } @Override public void process(WatchedEvent event) { if(event == null){ return; } Event.KeeperState keeperState = event.getState(); Event.EventType eventType = event.getType(); if ( Event.KeeperState.SyncConnected == keeperState) { if ( Event.EventType.None == eventType ) { LOG.info( LOG_PREFIX_OF_THREAD + "成功連接上ZKserver" ); connectedSemaphore.countDown(); }else if (event.getType() == Event.EventType.NodeDeleted && event.getPath().equals(waitPath)) { LOG.info(LOG_PREFIX_OF_THREAD + "收到情報。排我前面的家夥已掛,我是不是能夠出山了?"); try { if(checkMinPath()){ getLockSuccess(); } } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }else if ( Event.KeeperState.Disconnected == keeperState ) { LOG.info( LOG_PREFIX_OF_THREAD + "與ZKserver斷開連接" ); } else if ( Event.KeeperState.AuthFailed == keeperState ) { LOG.info( LOG_PREFIX_OF_THREAD + "權限檢查失敗" ); } else if ( Event.KeeperState.Expired == keeperState ) { LOG.info( LOG_PREFIX_OF_THREAD + "會話失效" ); } } }
log配置文件:
# DEFAULT log4j.rootLogger=INFO,CONSOLE # # Log INFO level and above messages to the console # log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender log4j.appender.CONSOLE.Threshold=INFO log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} - %m%n log4j.appender.COMMONSTAT=org.apache.log4j.DailyRollingFileAppender log4j.appender.COMMONSTAT.Threshold=INFO log4j.appender.COMMONSTAT.File=/home/zookeeper/zookeeper-test-agent/logs/test.log log4j.appender.COMMONSTAT.DatePattern=‘.‘yyyy-MM-dd log4j.appender.COMMONSTAT.layout=org.apache.log4j.PatternLayout log4j.appender.COMMONSTAT.layout.ConversionPattern=[%d{yyyy-MM-dd HH:mm:ss}] - %m%n log4j.logger.org.displaytag=WARN log4j.logger.org.apache.zookeeper=ERROR log4j.logger.org.springframework=WARN log4j.logger.org.I0Itec=WARN log4j.logger.commonStat=INFO,COMMONSTAT
執行結果:
2014-11-19 11:34:10,894 - 【第9個線程】成功連接上ZKserver 2014-11-19 11:34:10,895 - 【第8個線程】成功連接上ZKserver 2014-11-19 11:34:10,894 - 【第1個線程】成功連接上ZKserver 2014-11-19 11:34:10,894 - 【第7個線程】成功連接上ZKserver 2014-11-19 11:34:10,894 - 【第4個線程】成功連接上ZKserver 2014-11-19 11:34:10,895 - 【第5個線程】成功連接上ZKserver 2014-11-19 11:34:10,896 - 【第2個線程】成功連接上ZKserver 2014-11-19 11:34:10,894 - 【第10個線程】成功連接上ZKserver 2014-11-19 11:34:10,894 - 【第3個線程】成功連接上ZKserver 2014-11-19 11:34:10,895 - 【第6個線程】成功連接上ZKserver 2014-11-19 11:34:10,910 - 【第9個線程】節點創建成功, Path: /disLocks, content: 該節點由線程9創建 2014-11-19 11:34:10,912 - 【第9個線程】創建鎖路徑:/disLocks/sub0000000000 2014-11-19 11:34:10,917 - 【第6個線程】創建鎖路徑:/disLocks/sub0000000001 2014-11-19 11:34:10,917 - 【第9個線程】子節點中。我果然是老大/disLocks/sub0000000000 2014-11-19 11:34:10,921 - 【第3個線程】創建鎖路徑:/disLocks/sub0000000002 2014-11-19 11:34:10,922 - 【第6個線程】獲取子節點中。排在我前面的/disLocks/sub0000000000 2014-11-19 11:34:10,923 - 【第9個線程】獲取鎖成功,趕緊幹活!2014-11-19 11:34:10,924 - 【第10個線程】創建鎖路徑:/disLocks/sub0000000003 2014-11-19 11:34:10,924 - 【第3個線程】獲取子節點中。排在我前面的/disLocks/sub0000000001 2014-11-19 11:34:10,928 - 【第10個線程】獲取子節點中,排在我前面的/disLocks/sub0000000002 2014-11-19 11:34:10,929 - 【第1個線程】創建鎖路徑:/disLocks/sub0000000004 2014-11-19 11:34:10,932 - 【第5個線程】創建鎖路徑:/disLocks/sub0000000005 2014-11-19 11:34:10,935 - 【第1個線程】獲取子節點中。排在我前面的/disLocks/sub0000000003 2014-11-19 11:34:10,936 - 【第2個線程】創建鎖路徑:/disLocks/sub0000000006 2014-11-19 11:34:10,936 - 【第5個線程】獲取子節點中,排在我前面的/disLocks/sub0000000004 2014-11-19 11:34:10,940 - 【第4個線程】創建鎖路徑:/disLocks/sub0000000007 2014-11-19 11:34:10,941 - 【第2個線程】獲取子節點中,排在我前面的/disLocks/sub0000000005 2014-11-19 11:34:10,943 - 【第8個線程】創建鎖路徑:/disLocks/sub0000000008 2014-11-19 11:34:10,944 - 【第4個線程】獲取子節點中。排在我前面的/disLocks/sub0000000006 2014-11-19 11:34:10,945 - 【第7個線程】創建鎖路徑:/disLocks/sub0000000009 2014-11-19 11:34:10,946 - 【第8個線程】獲取子節點中。排在我前面的/disLocks/sub0000000007 2014-11-19 11:34:10,947 - 【第7個線程】獲取子節點中。排在我前面的/disLocks/sub0000000008 2014-11-19 11:34:12,923 - 【第9個線程】刪除本節點:/disLocks/sub0000000000 2014-11-19 11:34:12,926 - 【第6個線程】收到情報,排我前面的家夥已掛。我是不是能夠出山了? 2014-11-19 11:34:12,928 - 【第6個線程】子節點中。我果然是老大/disLocks/sub0000000001 2014-11-19 11:34:12,930 - 【第9個線程】釋放連接 2014-11-19 11:34:12,930 - 【第6個線程】獲取鎖成功,趕緊幹活。 2014-11-19 11:34:14,930 - 【第6個線程】刪除本節點:/disLocks/sub0000000001 2014-11-19 11:34:14,937 - 【第3個線程】收到情報,排我前面的家夥已掛,我是不是能夠出山了? 2014-11-19 11:34:14,941 - 【第3個線程】子節點中,我果然是老大/disLocks/sub0000000002 2014-11-19 11:34:14,943 - 【第6個線程】釋放連接 2014-11-19 11:34:14,946 - 【第3個線程】獲取鎖成功,趕緊幹活。 2014-11-19 11:34:16,946 - 【第3個線程】刪除本節點:/disLocks/sub0000000002 2014-11-19 11:34:16,949 - 【第10個線程】收到情報,排我前面的家夥已掛,我是不是能夠出山了? 2014-11-19 11:34:16,951 - 【第10個線程】子節點中。我果然是老大/disLocks/sub0000000003 2014-11-19 11:34:16,953 - 【第3個線程】釋放連接 2014-11-19 11:34:16,953 - 【第10個線程】獲取鎖成功。趕緊幹活! 2014-11-19 11:34:18,953 - 【第10個線程】刪除本節點:/disLocks/sub0000000003 2014-11-19 11:34:18,957 - 【第1個線程】收到情報,排我前面的家夥已掛,我是不是能夠出山了? 2014-11-19 11:34:18,960 - 【第10個線程】釋放連接 2014-11-19 11:34:18,961 - 【第1個線程】子節點中,我果然是老大/disLocks/sub0000000004 2014-11-19 11:34:18,964 - 【第1個線程】獲取鎖成功,趕緊幹活。 2014-11-19 11:34:20,964 - 【第1個線程】刪除本節點:/disLocks/sub0000000004 2014-11-19 11:34:20,967 - 【第5個線程】收到情報。排我前面的家夥已掛,我是不是能夠出山了? 2014-11-19 11:34:20,969 - 【第5個線程】子節點中,我果然是老大/disLocks/sub0000000005 2014-11-19 11:34:20,971 - 【第1個線程】釋放連接 2014-11-19 11:34:20,971 - 【第5個線程】獲取鎖成功,趕緊幹活!
2014-11-19 11:34:22,971 - 【第5個線程】刪除本節點:/disLocks/sub0000000005 2014-11-19 11:34:22,974 - 【第2個線程】收到情報。排我前面的家夥已掛,我是不是能夠出山了? 2014-11-19 11:34:22,978 - 【第2個線程】子節點中,我果然是老大/disLocks/sub0000000006 2014-11-19 11:34:22,979 - 【第5個線程】釋放連接 2014-11-19 11:34:22,981 - 【第2個線程】獲取鎖成功,趕緊幹活! 2014-11-19 11:34:24,981 - 【第2個線程】刪除本節點:/disLocks/sub0000000006 2014-11-19 11:34:24,985 - 【第4個線程】收到情報。排我前面的家夥已掛,我是不是能夠出山了? 2014-11-19 11:34:24,989 - 【第2個線程】釋放連接 2014-11-19 11:34:24,989 - 【第4個線程】子節點中,我果然是老大/disLocks/sub0000000007 2014-11-19 11:34:24,995 - 【第4個線程】獲取鎖成功,趕緊幹活!
2014-11-19 11:34:26,995 - 【第4個線程】刪除本節點:/disLocks/sub0000000007 2014-11-19 11:34:26,998 - 【第8個線程】收到情報。排我前面的家夥已掛。我是不是能夠出山了? 2014-11-19 11:34:27,000 - 【第8個線程】子節點中,我果然是老大/disLocks/sub0000000008 2014-11-19 11:34:27,004 - 【第8個線程】獲取鎖成功,趕緊幹活!
2014-11-19 11:34:27,004 - 【第4個線程】釋放連接 2014-11-19 11:34:29,004 - 【第8個線程】刪除本節點:/disLocks/sub0000000008 2014-11-19 11:34:29,007 - 【第7個線程】收到情報。排我前面的家夥已掛,我是不是能夠出山了? 2014-11-19 11:34:29,009 - 【第7個線程】子節點中。我果然是老大/disLocks/sub0000000009 2014-11-19 11:34:29,010 - 【第8個線程】釋放連接 2014-11-19 11:34:29,011 - 【第7個線程】獲取鎖成功,趕緊幹活! 2014-11-19 11:34:31,011 - 【第7個線程】刪除本節點:/disLocks/sub0000000009 2014-11-19 11:34:31,017 - 【第7個線程】釋放連接 2014-11-19 11:34:31,017 - 全部線程執行結束!
基於zookeeper簡單實現分布式鎖