zookeeper 實現分散式鎖
阿新 • • 發佈:2018-12-19
實現互斥鎖
package com.zookeeper.lock; import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.I0Itec.zkclient.IZkDataListener; import org.I0Itec.zkclient.ZkClient;View Codeimport org.I0Itec.zkclient.exception.ZkNoNodeException; public class BaseDistributedLock { private final ZkClient client; private final String path; // zookeeper中locker節點的路徑 private final String basePath; private final String lockName; private static final Integer MAX_RETRY_COUNT = 10;public BaseDistributedLock(ZkClient client, String path, String lockName) { this.client = client; this.basePath = path; this.path = path.concat("/").concat(lockName); this.lockName = lockName; } private void deleteOurPath(String ourPath) throws Exception { client.delete(ourPath); }private String createLockNode(ZkClient client, String path) throws Exception { return client.createEphemeralSequential(path, null); } private boolean waitToLock(long startMillis, Long millisToWait, String ourPath) throws Exception { boolean haveTheLock = false; boolean doDelete = false; try { while (!haveTheLock) { // 獲取lock節點下的所有節點 List children = getSortedChildren(); String sequenceNodeName = ourPath .substring(basePath.length() + 1); // 獲取當前節點的在所有節點列表中的位置 int ourIndex = children.indexOf(sequenceNodeName); // 節點位置小於0,說明沒有找到節點 if (ourIndex < 0) { throw new ZkNoNodeException("節點沒有找到: " + sequenceNodeName); } // 節點位置大於0說明還有其他節點在當前的節點前面,就需要等待其他的節點都釋放 boolean isGetTheLock = ourIndex == 0; String pathToWatch = (String) (isGetTheLock ? null : children .get(ourIndex - 1)); if (isGetTheLock) { haveTheLock = true; } else { String previousSequencePath = basePath.concat("/").concat( pathToWatch); final CountDownLatch latch = new CountDownLatch(1); final IZkDataListener previousListener = new IZkDataListener() { public void handleDataDeleted(String dataPath) throws Exception { latch.countDown(); } public void handleDataChange(String dataPath, Object data) throws Exception { // ignore } }; try { // 如果節點不存在會出現異常 client.subscribeDataChanges(previousSequencePath, previousListener); if (millisToWait != null) { millisToWait -= (System.currentTimeMillis() - startMillis); startMillis = System.currentTimeMillis(); if (millisToWait <= 0) { doDelete = true; // timed out - delete our node break; } latch.await(millisToWait, TimeUnit.MICROSECONDS); } else { latch.await(); } } catch (ZkNoNodeException e) { // ignore } finally { client.unsubscribeDataChanges(previousSequencePath, previousListener); } } } } catch (Exception e) { // 發生異常需要刪除節點 doDelete = true; throw e; } finally { // 如果需要刪除節點 if (doDelete) { deleteOurPath(ourPath); } } return haveTheLock; } private String getLockNodeNumber(String str, String lockName) { int index = str.lastIndexOf(lockName); if (index >= 0) { index += lockName.length(); return index <= str.length() ? str.substring(index) : ""; } return str; } List<String> getSortedChildren() throws Exception { try { List<String> children = client.getChildren(basePath); Collections.sort(children, new Comparator<String>() { public int compare(String lhs, String rhs) { return getLockNodeNumber(lhs, lockName).compareTo( getLockNodeNumber(rhs, lockName)); } }); return children; } catch (ZkNoNodeException e) { client.createPersistent(basePath, true); return getSortedChildren(); } } protected void releaseLock(String lockPath) throws Exception { deleteOurPath(lockPath); } protected String attemptLock(long time, TimeUnit unit) throws Exception { final long startMillis = System.currentTimeMillis(); final Long millisToWait = (unit != null) ? unit.toMillis(time) : null; String ourPath = null; boolean hasTheLock = false; boolean isDone = false; int retryCount = 0; // 網路閃斷需要重試一試 while (!isDone) { isDone = true; try { ourPath = createLockNode(client, path); hasTheLock = waitToLock(startMillis, millisToWait, ourPath); } catch (ZkNoNodeException e) { if (retryCount++ < MAX_RETRY_COUNT) { isDone = false; } else { throw e; } } } if (hasTheLock) { return ourPath; } return null; } }
介面類
package com.zookeeper.lock; import java.util.concurrent.TimeUnit; public interface DistributedLock { /** 獲取鎖,如果沒有得到就等待 */ public void acquire() throws Exception; /** * 獲取鎖,直到超時 * * @param time * 超時時間 * @param unit * 引數的單位 * @return 是否獲取到鎖 * @throws Exception */ public boolean acquire(long time, TimeUnit unit) throws Exception; /** * 釋放鎖 * * @throws Exception */ public void release() throws Exception; }View Code
測試類
package com.zookeeper.lock; import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.serialize.BytesPushThroughSerializer; import coom.zookeeperdemo.lock.SimpleDistributedLockMutex; public class TestDistributedLock { public static void main(String[] args) { final ZkClient zkClientExt1 = new ZkClient("192.168.1.105:2181", 5000, 5000, new BytesPushThroughSerializer()); final SimpleDistributedLockMutex mutex1 = new SimpleDistributedLockMutex( zkClientExt1, "/Mutex"); final ZkClient zkClientExt2 = new ZkClient("192.168.1.105:2181", 5000, 5000, new BytesPushThroughSerializer()); final SimpleDistributedLockMutex mutex2 = new SimpleDistributedLockMutex( zkClientExt2, "/Mutex"); try { mutex1.acquire(); System.out.println("Client1 locked"); Thread client2Thd = new Thread(new Runnable() { public void run() { try { mutex2.acquire(); System.out.println("Client2 locked"); mutex2.release(); System.out.println("Client2 released lock"); } catch (Exception e) { e.printStackTrace(); } } }); client2Thd.start(); Thread.sleep(5000); mutex1.release(); System.out.println("Client1 released lock"); client2Thd.join(); } catch (Exception e) { e.printStackTrace(); } } }View Code
原文不知道地址了