1. 程式人生 > >基於zookeeper實現分散式鎖

基於zookeeper實現分散式鎖

引言

在程式開發過程中不得不考慮的就是併發問題。在java中對於同一個jvm而言,jdk已經提供了lock和同步等。但是在分散式情況下,往往存在多個程序對一些資源產生競爭關係,而這些程序往往在不同的機器上,這個時候jdk中提供的已經不能滿足。分散式鎖顧明思議就是可以滿足分散式情況下的併發鎖。 下面我們講解怎麼利用zk實現分散式鎖。

ZooKeeper是一個分散式的,開放原始碼的分散式應用程式協調服務,是Google的Chubby一個開源的實現,是Hadoop和Hbase的重要元件。它是一個為分散式應用提供一致性服務的軟體,提供的功能包括:配置維護、域名服務、分散式同步、組服務等。

ZooKeeper的架構通過冗餘服務實現高可用性。因此,如果第一次無應答,客戶端就可以詢問另一臺ZooKeeper主機。ZooKeeper節點將它們的資料儲存於一個分層的名稱空間,非常類似於一個檔案系統或一個字首樹結構。客戶端可以在節點讀寫,從而以這種方式擁有一個共享的配置服務。更新是全序的。

基於ZooKeeper分散式鎖的流程

  • 在zookeeper指定節點(locks)下建立臨時順序節點node_n
  • 獲取locks下所有子節點children
  • 對子節點按節點自增序號從小到大排序
  • 判斷本節點是不是第一個子節點,若是,則獲取鎖;若不是,則監聽比該節點小的那個節點的刪除事件
  • 若監聽事件生效,則回到第二步重新進行判斷,直到獲取到鎖

具體實現

下面就具體使用java和zookeeper實現分散式鎖,操作zookeeper使用的是apache提供的zookeeper的包。

  • 通過實現Watch介面,實現process(WatchedEvent
    event)方法來實施監控,使CountDownLatch來完成監控,在等待鎖的時候使用CountDownLatch來計數,等到後進行countDown,停止等待,繼續執行。
  • 以下整體流程基本與上述描述流程一致,只是在監聽的時候使用的是CountDownLatch來監聽前一個節點。

程式碼部分

整個程式碼結構如圖:
在這裡插入圖片描述

1、首先建立一個介面Lock,顧名思義,大家很容易想到JDK下面有一個Lock的介面,但是這裡我並不打算直接使用這個介面,只是模擬了該接口裡的方法自定義實現,這樣可以按需使用,請看裡面的程式碼,

public interface Lock {
	public void getLock();
	public void unlock();
}

接口裡面比較簡單,一個是獲取鎖的介面,一個是釋放鎖的介面,

2、再來看AbstratcLock這個類,這是個抽象類,裡面有一個獲取鎖的方法和兩個待實現的抽象方法,

//定義基本模板
public abstract class AbstratcLock implements Lock{

	public void getLock() {
		if(tryLock()){
			System.out.println("##獲取鎖的資源===============");
		}else{
			waitLock();
			getLock();
		}
	}
	public abstract boolean tryLock();
	public abstract void waitLock();
}

3、zookeeper的基本配置項在ZookeeperAbstractLock這個類裡面,在這裡,我沒有使用單獨的配置檔案或者類去配置連線zookeeper的配置資訊,就在這個抽象類裡面全部搞定,

public abstract class ZookeeperAbstractLock extends AbstratcLock{
	
	private static String CONNECT_PATH = "127.0.0.1:2181";
	
	protected ZkClient zkClient = new ZkClient(CONNECT_PATH);
	
	protected static final String PATH = "/lock";
	
	protected static final String PATH2 = "/lock2";
	
}

4、接下來是zookeeper實現具體的分散式鎖的業務邏輯部分,實現的思路前面已經解釋過,再次總結一下就是,zookeeper基於記憶體實現的一種檔案樹節點,一旦某個執行緒成功建立了某個節點,其他執行緒繼續建立同名節點就無法成功,但該執行緒可以註冊一個監聽器,監聽上一個執行緒對該節點的變換情況,通過這個機制來判定對該節點的鎖的持有和釋放,從而實現效果,下面通過兩種方式來實現,

4.1 直接通過一個節點實現分散式鎖,程式碼如下,
public class ZookeeperDistributeLock extends ZookeeperAbstractLock{

	private CountDownLatch countDownLatch = null;
	
	//嘗試獲得鎖
	@Override
	public boolean tryLock() {
		try {
			zkClient.createEphemeral(PATH);
			return true;
		} catch (Exception e) {
			return false;
		}
	}
	
	//監聽某個節點,匿名回撥函式實現對節點資訊變化的監聽,
	@Override
	public void waitLock() {
		
		//一旦zookeeper檢測到節點資訊的變化,就會觸發匿名匿名回撥函式,通知訂閱的客戶端,即zkClient
		IZkDataListener iZkDataListener = new IZkDataListener() {
			
			public void handleDataDeleted(String path) throws Exception {
				//喚醒被等待的執行緒
				if(countDownLatch != null){
					countDownLatch.countDown();
				}
			}
			
			public void handleDataChange(String path, Object data) throws Exception {
				
			}
		};
		//註冊事件監聽
		zkClient.subscribeDataChanges(PATH, iZkDataListener);
		
		//如果節點存在了,則需要等待一直到接收到了事件通知
		if(zkClient.exists(PATH)){
			countDownLatch = new CountDownLatch(1);
			try {
				countDownLatch.await();
			} catch (Exception e) {
				e.printStackTrace();
			}
		}
		zkClient.unsubscribeDataChanges(PATH, iZkDataListener);
	}
	
	//釋放鎖
	public void unlock() {
		if(zkClient != null){
			zkClient.delete(PATH);
			zkClient.close();
			System.out.println("釋放鎖資源");
		}
	}
}

5、下面是測試類,模擬50個執行緒併發生成訂單的動作,OrderService該類,程式碼如下,

public class OrderService implements Runnable{
	
	//訂單號生成類
	private OrderNumGenerator orderNumGenerator = new OrderNumGenerator();
	
	private Lock lock = new ZookeeperDistributeLock();
	//private Lock lock = new ZookeeperDistributeLock2();
	
	public void run() {
		getNumber();
	}
	
	public void getNumber(){
		try {
			lock.getLock();
			String number = orderNumGenerator.getNumber();
			System.out.println(Thread.currentThread().getName() + ",產生了訂單:" + number);
		} catch (Exception e) {
			e.printStackTrace();
		}finally{
			lock.unlock();
		}
	}
	
	public static void main(String[] args) {
		System.out.println("##生成了訂單####");
		for(int i=0;i<50;i++){
			new Thread(new OrderService()).start();
		}
	}
	
}

將生成訂單的類的程式碼也附上,

public class OrderNumGenerator {
	
	public static int count =0;
	private Lock lock = new ReentrantLock();
	
	public String getNumber(){
		try {
			lock.lock();
			SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss");
			return sdf.format(new Date()) + "_" + ++count;
		} finally {
			lock.unlock();
		}
	}
	
}

下面執行上述的OrderService的main函式,看控制檯輸出結果,
在這裡插入圖片描述

上述50個執行緒很快就執行完畢了,而且沒有出現任何問題,我們知道zookeeper是基於記憶體型的分散式協調伺服器,大家有沒有發現,我這裡只建立了一個檔案節點,就可以實現效果,這主要是得益於記憶體操作的快速,但是問題來了,如果執行緒數量足夠多,等於是某個執行緒釋放了鎖,其他的執行緒一起去爭奪鎖,那樣,不管記憶體的執行速度有多快,總會有併發爭奪的時候出現,所以下面演示用zookeeper的臨時有序節點的方式實現,

6、實現的步驟上面已經說明,這裡再說一下,主要步驟是:

  1. 建立一個節點,這裡為:lock2 。節點型別為持久節點(PERSISTENT)
  2. 每當程序需要訪問共享資源時,會呼叫分散式鎖的lock()或tryLock()方法獲得鎖,這個時候會在第一步建立的lock節點下建立相應的順序子節點,節點型別為臨時順序節點(EPHEMERAL_SEQUENTIAL),由於在這樣的節點模式下,繼續建立同名節點,會直接在該節點下生成一個有序的臨時子節點編號,從小到大,依次排序;
  3. 在建立子節點後,對lock下面的所有以name開頭的子節點進行排序,判斷剛剛建立的子節點順序號是否是最小的節點,假如是最小節點,則獲得該鎖對資源進行訪問。
  4. 假如不是該節點,就獲得該節點的上一順序節點,並給該節點是否存在註冊監聽事件。同時在這裡阻塞。等待監聽事件的發生,獲得鎖控制權。
  5. 當呼叫完共享資源後,呼叫unlock()方法,關閉zk,進而可以引發監聽事件,釋放該鎖。

程式碼如下,

public class ZookeeperDistributeLock2 extends ZookeeperAbstractLock{

	private CountDownLatch countDownLatch = null;
	
	private String beforePath;		//前一個節點
	private String currentPath;		//當前節點
	
	//初始化主節點,如果不存在則建立
	public ZookeeperDistributeLock2(){
		if(!this.zkClient.exists(PATH2)){
			this.zkClient.createPersistent(PATH2);
		}
	}
	
	@Override
	public boolean tryLock() {
		//基於lock2節點,新建一個臨時節點
		if(currentPath == null || currentPath.length() <= 0){
			currentPath = this.zkClient.createEphemeralSequential(PATH2 + "/", beforePath);
		}
		//獲取所有臨時節點並進行排序
		List<String> children = this.zkClient.getChildren(PATH2);
		Collections.sort(children);
		
		if(currentPath.equals(PATH2 + "/" + children.get(0))){
			return true;
		}else{
			//如果當前節點在節點列表中不是排第一的位置,則獲取當前節點前面的節點,並賦值
			int wz = Collections.binarySearch(children, currentPath.substring(7));
			beforePath = PATH2 + "/" + children.get(wz-1);
		}
		return false;
	}

	@Override
	public void waitLock() {	//等待鎖
		IZkDataListener iZkDataListener = new IZkDataListener() {
			
			public void handleDataDeleted(String dataPath) throws Exception {
				//喚醒被等待的執行緒
				if(countDownLatch != null){
					countDownLatch.countDown();
				}
			}
			public void handleDataChange(String path, Object data) throws Exception {
				
			}
		};
		//註冊事件,對前一個節點進行監聽
		zkClient.subscribeDataChanges(beforePath, iZkDataListener);
		
		//如果節點存在了,則需要等待一直到接收到事件通知
		if(zkClient.exists(beforePath)){
			countDownLatch = new CountDownLatch(1);
			try {
				countDownLatch.await();
			} catch (Exception e) {
				e.printStackTrace();
			}
		}
		zkClient.unsubscribeDataChanges(beforePath, iZkDataListener);
		
	}
	
	//釋放鎖
	public void unlock() {
		zkClient.delete(currentPath);
		zkClient.close();
	}
	
}

執行main函式,

public class OrderService implements Runnable{
	
	//訂單號生成類
	private OrderNumGenerator orderNumGenerator = new OrderNumGenerator();
	
	//方式1實現
	//private Lock lock = new ZookeeperDistributeLock();
	
	//方式2實現
	private Lock lock = new ZookeeperDistributeLock2();
	
	public void run() {
		getNumber();
	}
	
	public void getNumber(){
		try {
			lock.getLock();
			String number = orderNumGenerator.getNumber();
			System.out.println(Thread.currentThread().getName() + ",產生了訂單:" + number);
		} catch (Exception e) {
			e.printStackTrace();
		}finally{
			lock.unlock();
		}
	}
	
	public static void main(String[] args) {
		System.out.println("##生成了訂單####");
		for(int i=0;i<50;i++){
			new Thread(new OrderService()).start();
		}
	}
	
}

看控制檯列印的結果,、在這裡插入圖片描述

同樣得到了結果,但是方式2的實現上更可靠,這是基於zookeeper自身的可靠性機制實現的;

以上就是使用zookeeper實現分散式鎖的過程,當然實際工作中,還可以使用redis,mysql或者其他開源框架也是可以的,個人覺得使用zookeeper還是比較簡單而且具有天然的優勢,因為其可靠性已經通過眾多的其他各類框架和應用得到了檢驗,本文到此結束,不足之處,敬請見諒!