JUC鎖框架原始碼閱讀-AQS+Zookeeper實現分散式鎖
阿新 • • 發佈:2021-09-10
介紹
1.建立一個永久節點
2.競爭鎖的時候同樣的的key 所有執行緒都往永久節點插入指定key name的臨時節點(節點不允許重複只會有一個插入成功)
3.插入失敗的開啟對永久節點的監聽
4.當時獲得鎖的執行緒down機或者刪除會觸發監聽。然後嘗試獲取CLH第一個執行緒節點 嘗試重新獲取鎖
程式碼已上傳github:https://github.com/aa310958153/zookeeper-lock
注:僅僅用於熟悉AQS如果要用到Zookeeper分散式鎖直接使用Curator基於對Zookeeper鎖的實現
InterProcessMutex:分散式可重入排它鎖
InterProcessSemaphoreMutex:分散式排它鎖
InterProcessReadWriteLock:分散式讀寫鎖
InterProcessMultiLock:將多個鎖作為單個實體管理的容器
加鎖
@Override protected boolean tryAcquire(int acquires) { //獲取當前執行緒 final Thread current = Thread.currentThread(); //獲取鎖狀態 int c = getState(); //等於0表示 當前空閒狀態可以嘗試獲取 <1>zkLock加鎖 if (c == 0) { if (zkLock()&&compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current);return true; } } //可重入判斷 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc);return true; } return false; }
<1>zkLock
/** * zookeeper儲存臨時節點實現加鎖 * @return */ public boolean zkLock() { String path= getLockPath(); boolean haveLock=false; try { curatorFramework .create() .creatingParentContainersIfNeeded() .withMode(CreateMode.EPHEMERAL) .forPath(path, syncValue.get().getValue().getBytes(StandardCharsets.UTF_8)); haveLock= true; } catch (org.apache.zookeeper.KeeperException.NodeExistsException e) {//重複的標識未獲取到鎖 haveLock=false; } catch (Exception e) { e.printStackTrace(); haveLock= false; } /** * 未獲取到鎖監聽永久節點 */ if(!haveLock){ TreeCache treeCache = new TreeCache(CuratorClient.getCurator(), SYN_SWITCH_ZK_NODE); try { treeCache.start(); } catch (Exception e) { e.printStackTrace(); } if(!syncValue.get().isAddListener) { treeCache.getListenable().addListener(new TreeCacheListener() { @Override public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception { ChildData eventData = event.getData(); switch (event.getType()) { case NODE_ADDED: //System.out.println(path + "節點新增:" + eventData.getPath() + "\t新增資料為:" + new String(eventData.getData())); break; case NODE_UPDATED: // System.out.println(eventData.getPath() + "節點資料更新\t更新資料為:" + new String(eventData.getData()) + "\t版本為:" + eventData.getStat().getVersion()); break; case NODE_REMOVED: //監聽到節點刪除。表示鎖正常釋放 或者持有鎖的服務斷開連線 //獲得第一個阻塞執行緒 喚醒 嘗試獲取鎖 Thread firstThread=getFirstQueuedThread(); if( firstThread!=null) { LockSupport.unpark(firstThread); } break; default: break; } } }); syncValue.get().setAddListener(true); } } return haveLock; }
釋放鎖
@Override protected boolean tryRelease(int releases) { //狀態-1 大於0的數字表示可重入加了多少次鎖 int c = getState() - releases; //如果加鎖執行緒非當前執行緒丟擲異常 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); //根據節點儲存的值校驗是否是當前執行緒加鎖。如果不是丟擲異常 String value; try { value= new String(curatorFramework.getData().forPath(getLockPath())); } catch (Exception e) { e.printStackTrace(); throw new IllegalMonitorStateException(); } if(value==null||!value.equals(syncValue.get().getValue())){ throw new IllegalMonitorStateException(); } boolean free = false; //當c等於0表示最後一次呼叫unlock 則進行鎖的釋放 if (c == 0) { free = true; //獲得鎖的執行緒設定為null setExclusiveOwnerThread(null); String path= getLockPath(); try { //刪除節點 會觸發節點監聽 curatorFramework.delete().forPath(path); } catch (Exception e) { throw new IllegalMonitorStateException(); } syncValue.remove(); } //設定state setState(c); return free; }