Zookeeper 實現分散式鎖(樂觀和悲觀)
說明:
做備忘用,大家之言彙總到一起。
Jar
<!-- zkclient依賴 -->
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.10</version>
</dependency>
zookeeper基礎鞏固
ZooKeeper 節點是有生命週期的,這取決於節點的型別。在 ZooKeeper 中,節點型別可以分為持久節點(PERSISTENT )、臨時節點(EPHEMERAL),以及時序節點(SEQUENTIAL ),具體在節點建立過程中,一般是組合使用,可以生成以下 4 種節點型別。
持久節點(PERSISTENT)
所謂持久節點,是指在節點建立後,就一直存在,直到有刪除操作來主動清除這個節點——不會因為建立該節點的客戶端會話失效而消失。
持久順序節點(PERSISTENT_SEQUENTIAL)
這類節點的基本特性和上面的節點型別是一致的。額外的特性是,在ZK中,每個父節點會為他的第一級子節點維護一份時序,會記錄每個子節點建立的先後順序。基於這個特性,在建立子節點的時候,可以設定這個屬性,那麼在建立節點過程中,ZK會自動為給定節點名加上一個數字字尾,作為新的節點名。這個數字字尾的範圍是整型的最大值。
臨時節點(EPHEMERAL)
和持久節點不同的是,臨時節點的生命週期和客戶端會話繫結。也就是說,如果客戶端會話失效,那麼這個節點就會自動被清除掉。注意,這裡提到的是會話失效,而非連線斷開。另外,在臨時節點下面不能建立子節點。
臨時順序節點(EPHEMERAL_SEQUENTIAL)
可以用來實現分散式鎖
程式碼
業務程式碼-模擬併發下生成id
package com.dongnao.lock;
import java.text.SimpleDateFormat;
import java.util.Date;
public class OrderCodeGenerator {
//自增長序列
private static int i =0;
//按照“年-月-日-小時-分鐘-秒-自增長序列”的規則生成訂單編號
public String getOrderCode (){
Date now = new Date();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss-");
return sdf.format(now)+ ++i;
}
public static void main(String[] args) {
OrderCodeGenerator ong = new OrderCodeGenerator();
for (int i = 0; i < 10; i++) {
System.out.println(ong.getOrderCode());
}
}
}
模擬100個執行緒去建立訂單id
程式碼說明:這裡我們用的java的發令槍來模擬併發CountDownLatch ,主函式執行 所有的執行緒都處於阻塞狀態 cdl.await();當 cdl.countDown();執行之後,所有執行緒開始併發執行 createOrder() ; 該方法中會用到 lock.lock(); 該lock 物件我們提供了三種例項,方式1是java自帶的,非分散式的。方式2,3是我們利用zookeeper 來實現的,這裡會貼出 方式2,3的具體程式碼,也會對比著去分析方式3差在那裡,如何優化到方式2這用利用zookeeper來實現分散式鎖,進而投入生產。
package com.dongnao.lock;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class OrderServiceImpl implements Runnable {
private static OrderCodeGenerator ong = new OrderCodeGenerator();
private Logger logger = LoggerFactory.getLogger(OrderServiceImpl.class);
private static final int NUM = 100;
// 按照執行緒數初始化倒計數器
private static CountDownLatch cdl = new CountDownLatch(NUM);
// private static Lock lock = new ReentrantLock(); 加鎖方式1
// private Lock lock = new ZookeeperImproveLock(); 加鎖方式2
private Lock lock = new ZookeeperLock(); 加鎖方式3
// 建立訂單介面
public void createOrder() {
String orderCode = null;
lock.lock();
try {
// 獲取訂單編號
orderCode = ong.getOrderCode();
} catch (Exception e) {
// TODO: handle exception
}finally{
lock.unlock();
}
// ……業務程式碼,此處省略100行程式碼
logger.info(Thread.currentThread().getName()
+ " =======================>" + orderCode);
}
@Override
public void run() {
try {
// 等待其他執行緒初始化
cdl.await();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
// 建立訂單
createOrder();
}
public static void main(String[] args) {
for (int i = 1; i <= NUM; i++) {
// 按照執行緒數迭代例項化執行緒
new Thread(new OrderServiceImpl()).start();
// 建立一個執行緒,倒計數器減1
cdl.countDown();
}
}
}
lock 物件 方式3
lock()方法是呼叫的入口,它去呼叫tryLock() 嘗試獲取鎖和阻塞其他執行緒,tryLock()中去建立持久節點LOCK,之前介紹過,持久節點只能有一個,所以其他執行緒去建立的時候,會丟擲ZkNodeExistsException 異常,tryLock()是非阻塞的,捕獲異常我們返回false, 在 lock() 中呼叫waitForLock(); 去阻塞執行緒和對LOCK節點的監聽,當鎖釋放了,繼續呼叫 lock(); 再去競爭鎖(遞迴)。
package com.dongnao.lock;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkNodeExistsException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ZookeeperLock implements Lock {
private static final String ZK_IP_PROT = "localhost:2181";
// private static final String ZK_IP_PROT = "13.206.6.232:2181";
private static final String LOCK_NODE = "/LOCK";
private ZkClient client = new ZkClient(ZK_IP_PROT);
private static final Logger logger = LoggerFactory.getLogger(ZookeeperLock.class);
private CountDownLatch cdl=null;
@Override
//阻塞的方式去獲取鎖
public void lock() {
if(tryLock()){
logger.info("=============get lock success==============");
}else{
waitForLock();
lock();
}
}
@Override
//通過新建節點的方式去嘗試加鎖 非阻塞
public boolean tryLock() {
try {
client.createPersistent(LOCK_NODE);
return true;
} catch (ZkNodeExistsException e) {
return false;
}
}
@Override
public void unlock() {
client.delete(LOCK_NODE);
}
private void waitForLock() {
//1.建立一個監聽
IZkDataListener listener = new IZkDataListener() {
@Override
public void handleDataDeleted(String dataPath) throws Exception {
//3.當其他的執行緒釋放鎖,丟擲事件,讓其他執行緒重新競爭鎖
logger.info("=============catch data delete event==============");
if(cdl!=null){
cdl.countDown();
}
}
@Override
public void handleDataChange(String dataPath, Object data) throws Exception {
// TODO Auto-generated method stub
}
};
client.subscribeDataChanges(LOCK_NODE, listener);
//2.如果節點還存在,讓執行緒阻塞
if(client.exists(LOCK_NODE)){
cdl = new CountDownLatch(1);
try {
cdl.await();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
client.unsubscribeDataChanges(LOCK_NODE, listener);
}
public static void main(String[] args) throws InterruptedException {
final CountDownLatch cdl = new CountDownLatch(1);
ZkClient client = new ZkClient(ZK_IP_PROT);
client.subscribeDataChanges(LOCK_NODE, new IZkDataListener() {
@Override
public void handleDataDeleted(String dataPath) throws Exception {
System.out.println("===============aaa===========");
cdl.countDown();
}
@Override
public void handleDataChange(String dataPath, Object data) throws Exception {
}
});
cdl.await();
}
//--------------------不需要寫邏輯的方法--------------------
@Override
public Condition newCondition() {
// TODO Auto-generated method stub
return null;
}
@Override
public void lockInterruptibly() throws InterruptedException {
// TODO Auto-generated method stub
}
@Override
public boolean tryLock(long time, TimeUnit unit)
throws InterruptedException {
// TODO Auto-generated method stub
return false;
}
}
測試方式3的弊端
方式3不建議投入生產,弊端有兩個:(1)會出現死鎖。(2)基於zookeeper內部機制,所有產生連線的客戶端,當節點LOCK 刪除之後,zookeeper回給所有的客戶端傳送 刪除通知,這嚴重的影響了我們的效能。如果我們有100個客戶端,當拿到鎖的 執行緒去釋放鎖(刪除該節點)之後,zookeeper會通過http 告訴99個客戶端該節點刪除了。
測試步驟1:
用命令 去建立一個LOCK節點,然後就會死鎖。因為對於這100個人來說,他們建立這個節點時發現已經存在了,它會拋異常,捕獲異常之後他們都會阻塞,沒有執行緒會去刪除這個節點,此時100個人永久等待。
執行主函式
啟動一堆執行緒之後,發現所有執行緒都是在阻塞
同理:當一個執行緒建立這個節點之後,伺服器宕機了,網路延遲等導致這個LOCK 節點 沒有合理性的釋放,其他執行緒死鎖。
步驟二,我們命令刪除LOCK來測試第二個弊端
因為,我們在程式碼裡 寫了對 LOCK 節點的監聽client.subscribeDataChanges(LOCK_NODE, listener);所以命令刪除之後,100執行緒正常的去搶佔鎖資源,一切程式恢復正常。如圖
上圖我們會發現: 肉眼可見的所有執行緒在搶鎖,很慢,而且每次釋放鎖(刪除節點),會有 n - 1次通知,n 為當前最大執行緒個數。
方式二解決了以上兩個弊端
(1)我們用臨時節點,這樣就不會死鎖。
(2)我們每個執行緒只監聽他的上一個節點(排序),這樣通知就變為了1 。
程式碼
package com.dongnao.lock;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.SerializableSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ZookeeperImproveLock implements Lock {
private static final String LOCK_PATH = "/LOCK";
private static final String ZOOKEEPER_IP_PORT = "localhost:2181";
private ZkClient client = new ZkClient(ZOOKEEPER_IP_PORT, 1000, 1000, new SerializableSerializer());
private static Logger logger = LoggerFactory.getLogger(ZookeeperImproveLock.class);
private CountDownLatch cdl;
private String beforePath;// 當前請求的節點
private String currentPath;// 當前請求的節點前一個節點
// 判斷有沒有LOCK目錄,沒有則建立
public ZookeeperImproveLock() {
if (!this.client.exists(LOCK_PATH)) {
this.client.createPersistent(LOCK_PATH);
}
}
public void lock() {
if (!tryLock()) {
waitForLock();
lock();
} else {
logger.info(Thread.currentThread().getName() + " 獲得分散式鎖!");
}
}
/**
* 為當前節點新增 監聽器
*/
private void waitForLock() {
IZkDataListener listener = new IZkDataListener() {
// 刪除的時候去監聽
public void handleDataDeleted(String dataPath) throws Exception {
logger.info(Thread.currentThread().getName() + ":捕獲到DataDelete事件!---------------------------");
if (cdl != null) {
cdl.countDown();
}
}
// 發生改變的時候去監聽
public void handleDataChange(String dataPath, Object data) throws Exception {
}
};
// 給之前的節點增加資料刪除的watcher
this.client.subscribeDataChanges(beforePath, listener);
if (this.client.exists(beforePath)) { // 如果這個節點存在
cdl = new CountDownLatch(1);
try {
cdl.await(); // 執行緒就給他阻塞,讓他等
} catch (InterruptedException e) {
e.printStackTrace();
}
}
this.client.unsubscribeDataChanges(beforePath, listener);
}
public boolean tryLock() {
// 如果currentPath為空則為第一次嘗試加鎖,第一次加鎖賦值currentPath
if (currentPath == null || currentPath.length() <= 0) {
// 建立一個臨時順序節點
currentPath = this.client.createEphemeralSequential(LOCK_PATH + '/', "lock");
System.out.println("---------------------------->" + currentPath);
}
// 獲取所有臨時節點並排序,臨時節點名稱為自增長的字串如:0000000400
List<String> childrens = this.client.getChildren(LOCK_PATH);
Collections.sort(childrens);
if (currentPath.equals(LOCK_PATH + '/' + childrens.get(0))) {// 如果當前節點在所有節點中排名第一則獲取鎖成功
return true;
} else {// 如果當前節點在所有節點中排名中不是排名第一,則獲取前面的節點名稱,並賦值給beforePath
int wz = Collections.binarySearch(childrens, currentPath.substring(6));
beforePath = LOCK_PATH + '/' + childrens.get(wz - 1);
}
return false;
}
public void unlock() {
// 刪除當前臨時節點
client.delete(currentPath);
}
// ===================用不到的實現方法=======================
public void lockInterruptibly() throws InterruptedException {
// TODO Auto-generated method stub
}
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
// TODO Auto-generated method stub
return false;
}
public Condition newCondition() {
// TODO Auto-generated method stub
return null;
}
}
執行效果:
LOKC下建立有序號的 值
上圖發現秒級搶鎖, 方式2的程式碼就不介紹了,主要就是:獲取該節點下所有的 值,然後排序,取第一個,100個執行緒都有自己的編號,然後跟排序完第一個equals() 比較,肯定只有一個能批對上,其他99個去阻塞等~,再就是監聽上一個節點來保證通知只會發生一次保證效能。
基於Zookeeper實現分散式鎖 已經沒有問題了。多說一嘴,我們上面是通過java的發令槍CountDownLatch 來進行阻塞,實現悲觀鎖,如果我們不阻塞可以實現樂觀鎖。
補充:
該異常是因為客戶端和伺服器部署的zookeeper版本不相容導致,上面介紹過,我們的客戶端支援3.4.8 一下版本的 zookeeper, 以上版本就會出現如下異常。