1. 程式人生 > >ZooKeeper 分布式共享鎖的實現

ZooKeeper 分布式共享鎖的實現

客戶端 客戶端訪問服務器 tps client ech 超時 script isn pan

原創播客,如需轉載請註明出處。原文地址:http://www.cnblogs.com/crawl/p/8352919.html

----------------------------------------------------------------------------------------------------------------------------------------------------------

筆記中提供了大量的代碼示例,需要說明的是,大部分代碼示例都是本人所敲代碼並進行測試,不足之處,請大家指正~

本博客中所有言論僅代表博主本人觀點,若有疑惑或者需要本系列分享中的資料工具,敬請聯系 [email protected]

GitHub:https://github.com/QingqingQi

-----------------------------------------------------------------------------------------------------------------------------------------------------------

前言:ZooKeeper 是提供少量數據存儲和管理的分布式協調服務。適合存儲狀態管理信息,可以進行數據的讀寫,同步,提供對數據節點的監聽功能。利用 ZooKeeper 可以實現很多功能,比如:Hadoop2.0,使用 Zookeeper 的事件處理確保整個集群只有一個活躍的 NameNode,存儲配置信息等;可以利用 ZooKeeper 感知集群中哪臺主機宕機或者下線等等。今天介紹另一個常用的功能,利用 Zookeeper 實現分布式共享鎖。

一、簡要介紹

利用 Zookeeper 實現分布式共享鎖,可以做到一次只有指定個數的客戶端訪問服務器的某些資源。

二、實現步驟

利用 Zookeeper 實現分布式共享鎖的步驟大致可以分為以下幾步:

1. 客戶端上線即向 Zookeeper 註冊,創建一把鎖

2. 判斷是否只有一個客戶端工作,若只有一個客戶端工作,此客戶端可以處理業務

3. 獲取父節點下註冊的所有鎖,通過判斷自己是否是號碼最小的那一把鎖,若是則可以處理業務,否則等待

值的註意的是,在某一客戶端獲取到鎖處理完業務後,必須釋放鎖

三、實現代碼

1. 新建一個 DistributedLock 類

private ZooKeeper zkClient = null
; //連接字符串 private static final String connectString = "zookeeper01:2181,zookeeper02:2181,zookeeper03:2181"; //超時時間 private static final int sessionTimeout = 2000; //父節點 private static final String parentNode = "/locks"; //記錄自己創建子節點的路徑 private volatile String thisPath; public static void main(String[] args) throws Exception { //1.獲取 ZooKeeper 的客戶端連接 DistributedLock distLock = new DistributedLock(); distLock.getZKClient(); //2.註冊一把鎖 distLock.regiestLock(); //3.監聽父節點,判斷是否只有自己在線 distLock.watchParent(); }

2. main 方法中定義了三個方法

1)getZKClient():用來獲取 Zookeeper 客戶端的連接

其中 process 方法是當監聽節點發生變化時調用,其中獲取定義的父節點的所有子節點,然後判斷當前節點是否是最小節點,若是則進行業務邏輯處理階段,並重新註冊一把新的鎖

//獲取 zk 客戶端
    public void getZKClient() throws Exception {
        zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
            
            @Override
            public void process(WatchedEvent event) {
                //判斷事件類型,只處理子節點變化事件
                if(event.getType() == EventType.NodeChildrenChanged && event.getPath().equals(parentNode)) {
                    try {
                        List<String> childrens = zkClient.getChildren(parentNode, true);
                        //判斷自己是否是最小的
                        String thisNode = thisPath.substring((parentNode + "/").length());
                        Collections.sort(childrens);
                        if(childrens.indexOf(thisNode) == 0){
                            //處理業務邏輯
                            dosomething();
                            //重新註冊一把新的鎖
                            thisPath = zkClient.create(parentNode + "/lock", null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }
        });
}

2)main 中的第二個方法是 rediestLock()

調用 Zookeeper 客戶端的 create() 方法,建立一個新的節點

//註冊一把鎖
    public void regiestLock() throws Exception {
        thisPath = zkClient.create(parentNode + "/lock", null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
    }

3)第三個是 watchParent() 方法

在此方法中判斷是否只有一個節點在線,若只有自己一個節點,則調用業務處理的方法

//監聽父節點,判斷是否只有自己在線
    public void watchParent() throws Exception {
        List<String> childrens = zkClient.getChildren(parentNode, true);
        if (childrens != null && childrens.size() == 1) {
            //只有自己在線,處理業務邏輯(處理完業務邏輯,必須刪釋放鎖)
            dosomething();
        } else {
            //不是只有自己在線,說明別人已經獲取到鎖,等待
            Thread.sleep(Long.MAX_VALUE);
        }
    }

4)最後一個是自定義的業務邏輯方法

需要註意的是,當處理完業務邏輯後,必須釋放鎖

//業務邏輯方法,註意:需要在最後釋放鎖
    public void dosomething() throws Exception {
        System.out.println("或得到鎖:" + thisPath);
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            System.out.println("釋放鎖:" + thisPath);
            zkClient.delete(thisPath, -1);
        }
    }

3. 最後貼一下全部代碼

package com.software.bigdata.zkdistlock;

import java.util.Collections;
import java.util.List;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;

/**
 * @Description: 分布式共享鎖
 * 
 * @author Crawl
 * @date 2018年1月25日 下午5:02:42
 */
public class DistributedLock {
    
    private ZooKeeper zkClient = null;
    
    //連接字符串
    private static final String connectString = "zookeeper01:2181,zookeeper02:2181,zookeeper03:2181";
    
    //超時時間
    private static final int sessionTimeout = 2000;
    
    //父節點
    private static final String parentNode = "/locks";
    
    //記錄自己創建子節點的路徑
    private volatile String thisPath;
    
    public static void main(String[] args) throws Exception {
        //1.獲取 ZooKeeper 的客戶端連接
        DistributedLock distLock = new DistributedLock();
        distLock.getZKClient();
        
        //2.註冊一把鎖
        distLock.regiestLock();
        
        //3.監聽父節點,判斷是否只有自己在線
        distLock.watchParent();
    }
    
    //業務邏輯方法,註意:需要在最後釋放鎖
    public void dosomething() throws Exception {
        System.out.println("或得到鎖:" + thisPath);
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            System.out.println("釋放鎖:" + thisPath);
            zkClient.delete(thisPath, -1);
        }
    }
    
    //監聽父節點,判斷是否只有自己在線
    public void watchParent() throws Exception {
        List<String> childrens = zkClient.getChildren(parentNode, true);
        if (childrens != null && childrens.size() == 1) {
            //只有自己在線,處理業務邏輯(處理完業務邏輯,必須刪釋放鎖)
            dosomething();
        } else {
            //不是只有自己在線,說明別人已經獲取到鎖,等待
            Thread.sleep(Long.MAX_VALUE);
        }
    }
    
    //註冊一把鎖
    public void regiestLock() throws Exception {
        thisPath = zkClient.create(parentNode + "/lock", null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
    }
    
    //獲取 zk 客戶端
    public void getZKClient() throws Exception {
        zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
            
            @Override
            public void process(WatchedEvent event) {
                //判斷事件類型,只處理子節點變化事件
                if(event.getType() == EventType.NodeChildrenChanged && event.getPath().equals(parentNode)) {
                    try {
                        List<String> childrens = zkClient.getChildren(parentNode, true);
                        //判斷自己是否是最小的
                        String thisNode = thisPath.substring((parentNode + "/").length());
                        Collections.sort(childrens);
                        if(childrens.indexOf(thisNode) == 0){
                            //處理業務邏輯
                            dosomething();
                            //重新註冊一把新的鎖
                            thisPath = zkClient.create(parentNode + "/lock", null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }
        });
    }

}

ZooKeeper 分布式共享鎖的實現