基於zookeeper實現分散式鎖
引言
在程式開發過程中不得不考慮的就是併發問題。在java中對於同一個jvm而言,jdk已經提供了lock和同步等。但是在分散式情況下,往往存在多個程序對一些資源產生競爭關係,而這些程序往往在不同的機器上,這個時候jdk中提供的已經不能滿足。分散式鎖顧明思議就是可以滿足分散式情況下的併發鎖。 下面我們講解怎麼利用zk實現分散式鎖。
ZooKeeper是一個分散式的,開放原始碼的分散式應用程式協調服務,是Google的Chubby一個開源的實現,是Hadoop和Hbase的重要元件。它是一個為分散式應用提供一致性服務的軟體,提供的功能包括:配置維護、域名服務、分散式同步、組服務等。
ZooKeeper的架構通過冗餘服務實現高可用性。因此,如果第一次無應答,客戶端就可以詢問另一臺ZooKeeper主機。ZooKeeper節點將它們的資料儲存於一個分層的名稱空間,非常類似於一個檔案系統或一個字首樹結構。客戶端可以在節點讀寫,從而以這種方式擁有一個共享的配置服務。更新是全序的。
基於ZooKeeper分散式鎖的流程
- 在zookeeper指定節點(locks)下建立臨時順序節點node_n
- 獲取locks下所有子節點children
- 對子節點按節點自增序號從小到大排序
- 判斷本節點是不是第一個子節點,若是,則獲取鎖;若不是,則監聽比該節點小的那個節點的刪除事件
- 若監聽事件生效,則回到第二步重新進行判斷,直到獲取到鎖
具體實現
下面就具體使用java和zookeeper實現分散式鎖,操作zookeeper使用的是apache提供的zookeeper的包。
- 通過實現Watch介面,實現process(WatchedEvent
event)方法來實施監控,使CountDownLatch來完成監控,在等待鎖的時候使用CountDownLatch來計數,等到後進行countDown,停止等待,繼續執行。 - 以下整體流程基本與上述描述流程一致,只是在監聽的時候使用的是CountDownLatch來監聽前一個節點。
程式碼部分
整個程式碼結構如圖:
1、首先建立一個介面Lock,顧名思義,大家很容易想到JDK下面有一個Lock的介面,但是這裡我並不打算直接使用這個介面,只是模擬了該接口裡的方法自定義實現,這樣可以按需使用,請看裡面的程式碼,
public interface Lock {
public void getLock();
public void unlock();
}
接口裡面比較簡單,一個是獲取鎖的介面,一個是釋放鎖的介面,
2、再來看AbstratcLock這個類,這是個抽象類,裡面有一個獲取鎖的方法和兩個待實現的抽象方法,
//定義基本模板
public abstract class AbstratcLock implements Lock{
public void getLock() {
if(tryLock()){
System.out.println("##獲取鎖的資源===============");
}else{
waitLock();
getLock();
}
}
public abstract boolean tryLock();
public abstract void waitLock();
}
3、zookeeper的基本配置項在ZookeeperAbstractLock這個類裡面,在這裡,我沒有使用單獨的配置檔案或者類去配置連線zookeeper的配置資訊,就在這個抽象類裡面全部搞定,
public abstract class ZookeeperAbstractLock extends AbstratcLock{
private static String CONNECT_PATH = "127.0.0.1:2181";
protected ZkClient zkClient = new ZkClient(CONNECT_PATH);
protected static final String PATH = "/lock";
protected static final String PATH2 = "/lock2";
}
4、接下來是zookeeper實現具體的分散式鎖的業務邏輯部分,實現的思路前面已經解釋過,再次總結一下就是,zookeeper基於記憶體實現的一種檔案樹節點,一旦某個執行緒成功建立了某個節點,其他執行緒繼續建立同名節點就無法成功,但該執行緒可以註冊一個監聽器,監聽上一個執行緒對該節點的變換情況,通過這個機制來判定對該節點的鎖的持有和釋放,從而實現效果,下面通過兩種方式來實現,
4.1 直接通過一個節點實現分散式鎖,程式碼如下,
public class ZookeeperDistributeLock extends ZookeeperAbstractLock{
private CountDownLatch countDownLatch = null;
//嘗試獲得鎖
@Override
public boolean tryLock() {
try {
zkClient.createEphemeral(PATH);
return true;
} catch (Exception e) {
return false;
}
}
//監聽某個節點,匿名回撥函式實現對節點資訊變化的監聽,
@Override
public void waitLock() {
//一旦zookeeper檢測到節點資訊的變化,就會觸發匿名匿名回撥函式,通知訂閱的客戶端,即zkClient
IZkDataListener iZkDataListener = new IZkDataListener() {
public void handleDataDeleted(String path) throws Exception {
//喚醒被等待的執行緒
if(countDownLatch != null){
countDownLatch.countDown();
}
}
public void handleDataChange(String path, Object data) throws Exception {
}
};
//註冊事件監聽
zkClient.subscribeDataChanges(PATH, iZkDataListener);
//如果節點存在了,則需要等待一直到接收到了事件通知
if(zkClient.exists(PATH)){
countDownLatch = new CountDownLatch(1);
try {
countDownLatch.await();
} catch (Exception e) {
e.printStackTrace();
}
}
zkClient.unsubscribeDataChanges(PATH, iZkDataListener);
}
//釋放鎖
public void unlock() {
if(zkClient != null){
zkClient.delete(PATH);
zkClient.close();
System.out.println("釋放鎖資源");
}
}
}
5、下面是測試類,模擬50個執行緒併發生成訂單的動作,OrderService該類,程式碼如下,
public class OrderService implements Runnable{
//訂單號生成類
private OrderNumGenerator orderNumGenerator = new OrderNumGenerator();
private Lock lock = new ZookeeperDistributeLock();
//private Lock lock = new ZookeeperDistributeLock2();
public void run() {
getNumber();
}
public void getNumber(){
try {
lock.getLock();
String number = orderNumGenerator.getNumber();
System.out.println(Thread.currentThread().getName() + ",產生了訂單:" + number);
} catch (Exception e) {
e.printStackTrace();
}finally{
lock.unlock();
}
}
public static void main(String[] args) {
System.out.println("##生成了訂單####");
for(int i=0;i<50;i++){
new Thread(new OrderService()).start();
}
}
}
將生成訂單的類的程式碼也附上,
public class OrderNumGenerator {
public static int count =0;
private Lock lock = new ReentrantLock();
public String getNumber(){
try {
lock.lock();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss");
return sdf.format(new Date()) + "_" + ++count;
} finally {
lock.unlock();
}
}
}
下面執行上述的OrderService的main函式,看控制檯輸出結果,
上述50個執行緒很快就執行完畢了,而且沒有出現任何問題,我們知道zookeeper是基於記憶體型的分散式協調伺服器,大家有沒有發現,我這裡只建立了一個檔案節點,就可以實現效果,這主要是得益於記憶體操作的快速,但是問題來了,如果執行緒數量足夠多,等於是某個執行緒釋放了鎖,其他的執行緒一起去爭奪鎖,那樣,不管記憶體的執行速度有多快,總會有併發爭奪的時候出現,所以下面演示用zookeeper的臨時有序節點的方式實現,
6、實現的步驟上面已經說明,這裡再說一下,主要步驟是:
- 建立一個節點,這裡為:lock2 。節點型別為持久節點(PERSISTENT)
- 每當程序需要訪問共享資源時,會呼叫分散式鎖的lock()或tryLock()方法獲得鎖,這個時候會在第一步建立的lock節點下建立相應的順序子節點,節點型別為臨時順序節點(EPHEMERAL_SEQUENTIAL),由於在這樣的節點模式下,繼續建立同名節點,會直接在該節點下生成一個有序的臨時子節點編號,從小到大,依次排序;
- 在建立子節點後,對lock下面的所有以name開頭的子節點進行排序,判斷剛剛建立的子節點順序號是否是最小的節點,假如是最小節點,則獲得該鎖對資源進行訪問。
- 假如不是該節點,就獲得該節點的上一順序節點,並給該節點是否存在註冊監聽事件。同時在這裡阻塞。等待監聽事件的發生,獲得鎖控制權。
- 當呼叫完共享資源後,呼叫unlock()方法,關閉zk,進而可以引發監聽事件,釋放該鎖。
程式碼如下,
public class ZookeeperDistributeLock2 extends ZookeeperAbstractLock{
private CountDownLatch countDownLatch = null;
private String beforePath; //前一個節點
private String currentPath; //當前節點
//初始化主節點,如果不存在則建立
public ZookeeperDistributeLock2(){
if(!this.zkClient.exists(PATH2)){
this.zkClient.createPersistent(PATH2);
}
}
@Override
public boolean tryLock() {
//基於lock2節點,新建一個臨時節點
if(currentPath == null || currentPath.length() <= 0){
currentPath = this.zkClient.createEphemeralSequential(PATH2 + "/", beforePath);
}
//獲取所有臨時節點並進行排序
List<String> children = this.zkClient.getChildren(PATH2);
Collections.sort(children);
if(currentPath.equals(PATH2 + "/" + children.get(0))){
return true;
}else{
//如果當前節點在節點列表中不是排第一的位置,則獲取當前節點前面的節點,並賦值
int wz = Collections.binarySearch(children, currentPath.substring(7));
beforePath = PATH2 + "/" + children.get(wz-1);
}
return false;
}
@Override
public void waitLock() { //等待鎖
IZkDataListener iZkDataListener = new IZkDataListener() {
public void handleDataDeleted(String dataPath) throws Exception {
//喚醒被等待的執行緒
if(countDownLatch != null){
countDownLatch.countDown();
}
}
public void handleDataChange(String path, Object data) throws Exception {
}
};
//註冊事件,對前一個節點進行監聽
zkClient.subscribeDataChanges(beforePath, iZkDataListener);
//如果節點存在了,則需要等待一直到接收到事件通知
if(zkClient.exists(beforePath)){
countDownLatch = new CountDownLatch(1);
try {
countDownLatch.await();
} catch (Exception e) {
e.printStackTrace();
}
}
zkClient.unsubscribeDataChanges(beforePath, iZkDataListener);
}
//釋放鎖
public void unlock() {
zkClient.delete(currentPath);
zkClient.close();
}
}
執行main函式,
public class OrderService implements Runnable{
//訂單號生成類
private OrderNumGenerator orderNumGenerator = new OrderNumGenerator();
//方式1實現
//private Lock lock = new ZookeeperDistributeLock();
//方式2實現
private Lock lock = new ZookeeperDistributeLock2();
public void run() {
getNumber();
}
public void getNumber(){
try {
lock.getLock();
String number = orderNumGenerator.getNumber();
System.out.println(Thread.currentThread().getName() + ",產生了訂單:" + number);
} catch (Exception e) {
e.printStackTrace();
}finally{
lock.unlock();
}
}
public static void main(String[] args) {
System.out.println("##生成了訂單####");
for(int i=0;i<50;i++){
new Thread(new OrderService()).start();
}
}
}
看控制檯列印的結果,、
同樣得到了結果,但是方式2的實現上更可靠,這是基於zookeeper自身的可靠性機制實現的;
以上就是使用zookeeper實現分散式鎖的過程,當然實際工作中,還可以使用redis,mysql或者其他開源框架也是可以的,個人覺得使用zookeeper還是比較簡單而且具有天然的優勢,因為其可靠性已經通過眾多的其他各類框架和應用得到了檢驗,本文到此結束,不足之處,敬請見諒!