1. 程式人生 > 實用技巧 >zookeeper實現分散式讀寫鎖

zookeeper實現分散式讀寫鎖

  大多數應用程式在大部分時間都是在響應資料讀取事件,而資料的讀寫併發可能會帶來資料髒讀(讀執行緒在讀取的資料,寫執行緒已經修改但未提交)等問題,基於類似的可能會發生對同一個資源進行讀寫操作的場景,我們需要實現讀寫鎖來保證資料準確,讀寫鎖是指讀執行緒之間不存在互斥,讀寫及寫寫之間存在互斥我們可以用一個表格來列舉兩個執行緒併發時的所有情況:

執行緒A 執行緒B 通過性
×
×
×

  

  java.util.concurrent.locks包下的一個實現類ReentrantReadWriteLock可以解決單個應用中的讀寫鎖場景,ReentrantReadWriteLock是一個可重入鎖,重入的意思是具有相同性質的執行緒在同一時刻可以獲取到該鎖,ReentrantReadWriteLock的使用為讀執行緒在進行業務操作之前呼叫readLock().lock()方法進行讀鎖定,業務操作完畢之後呼叫readLock().unlock()方法進行解鎖,寫執行緒進行寫業務操作之前呼叫rwriteLock().lock()方法進行資源鎖定,在寫業務操作完成後呼叫writeLock().unlock()進行寫鎖解鎖。

  ReentrantReadWriteLock只能滿足單應用的讀寫鎖需求,對於分散式的多個應用則無能為力,例如我們要操作Hadoop(分散式檔案系統)上的一份資料,我們在不同的伺服器都安裝了應用程式(作為負載均衡),不同伺服器應用程式可能會併發(讀寫併發)請求該資料,這個時候我們就要單獨寫一個專門管理讀寫鎖獲取及解鎖的應用,部署在中心伺服器,所有讀寫操作伺服器上應用程式在進行業務操作之前都要先訪問該讀寫鎖管理應用,要實現一個可靠的讀寫鎖管理應用要解決的難題很多,所幸的是Zookeeper已經為我們提供了類似的服務,我們只要稍加利用便可以實現我們想要的效果。

  Zookeeper是一個為分散式應用提供一致性服務的應用,可以把Zookeeper看作一個小型檔案系統有目錄(有子檔案或目錄)有檔案(檔案可以記錄資料也可以作為目錄),因此我們把目錄和檔案都稱為節點,節點有永續性的和臨時性(客戶端斷開後會被Zookeeper刪除)兩類,而其中又分為順序型(建立該型別節點的時候Zookeeper會自動新增一個父目錄下自增序列,這樣我們可以避免節點名稱的重複)和非順序型。

  有了這些基礎之後我們就可以創造自己的分散式讀寫鎖了,程式碼入下:

  

package com.shenyuchong.zkclient;
import com.gbd.database.Cache;
import org.apache.log4j.Logger;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import java.util.List;
import java.util.Random;

public class ZkClient { private static Logger log = Logger.getLogger(ZkClient.class); private final static String LOCK_HOME="/shenyuchong";//分散式鎖的工作目錄 private static Object lock = new Object(); private static ZooKeeper zooKeeper = null; public static void init() throws Exception { /** * 一開始就與Zookeeper伺服器連線上 */ zooKeeper = new ZooKeeper(Cache.ZOOKEEPER_CONNECT_STRING, 5000, null); if(zooKeeper!=null){ if(zooKeeper.exists(LOCK_HOME, false)==null){ zooKeeper.create(LOCK_HOME,null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } } public static Sequent readLock(String path){ path=LOCK_HOME+(path.startsWith("/")?path:"/"+path);//傳入目錄不是/開頭的新增一個斜槓 String seq = null; try { createPath(zooKeeper,path);//防止目錄不存在先建立(同步) boolean hasWrite=true;//是否存在寫鎖 while(hasWrite){ hasWrite=false; /** * 在path目錄下先建立一個讀r_開頭的順序型臨時節點,然後再判斷該目錄下的孩子中是否有寫節點(w_開頭的節點) */ seq = zooKeeper.create(path + "/r_", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); List<String> children = zooKeeper.getChildren(path, false); for(String c:children){ if(c.startsWith("w_")) { hasWrite=true; break; } } if(hasWrite){ /** * 當發現有寫執行緒,則刪除剛才建立的讀節點,並阻塞當前執行緒,等待通知或者等待一個隨機時間 */ zooKeeper.delete(seq,-1); synchronized (lock) { /** * 隨機時間不能小於3秒,因為我們認為寫任務一搬沒這麼快完成 */ Random rand =new Random(); int i = rand.nextInt(10000); if(i<3000) i=3000; lock.wait(i); } } } } catch (Exception e) { e.printStackTrace(); } return new Sequent(seq); } public static Sequent writeLock(String path){ path=LOCK_HOME+(path.startsWith("/")?path:"/"+path); String seq = null; try { createPath(zooKeeper,path); boolean hasRead=true; while(hasRead){ /** * 在path目錄下先建立一個寫w_開頭的順序型臨時節點,然後再判斷該目錄下的孩子中是否有節點(不管是寫執行緒還是讀執行緒都應該等待) */ seq = zooKeeper.create(path + "/w_", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); List<String> children = zooKeeper.getChildren(path, false); if(children!=null&&children.size()>1){ /** * 當發現有寫執行緒,則刪除剛才建立的讀節點,並阻塞當前執行緒,等待通知或者等待一個隨機時間 */ zooKeeper.delete(seq,-1); hasRead=true; synchronized (lock) { Random rand =new Random(); /** * 隨機時間不能小於1秒,因為我們認為讀執行緒應該進可能的響應 */ int i = rand.nextInt(10000); if(i<1000) i=1000; lock.wait(i); } }else{ hasRead=false; } } } catch (Exception e) { e.printStackTrace(); } return new Sequent(seq); } public static void unLock(Sequent sequent){ if(sequent==null||sequent.getName()==null||"".equals(sequent.getName())) return; try { /** * 刪除又執行緒建立的節點就代表解鎖了,最後需要通知所有執行緒,分散式的程式通知不了,所以需要在wait()中傳入等待的時間引數 */ zooKeeper.delete(sequent.getName(),-1); } catch (Exception e) { e.printStackTrace(); }finally { synchronized (lock){ lock.notifyAll();//通知的話可以減少等待時間 } } } public static void createPath(ZooKeeper zooKeeper,String path){ try { if(zooKeeper.exists(path, false)==null) zooKeeper.create(path,null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);//建立一個持久化節點 }catch (Exception e){ e.printStackTrace(); } }
  //節點名稱(用於給解鎖用,單一個字串約束性低)
public static class Sequent{ private String name; public Sequent(String name) { this.name = name; } public String getName() { return name; } } }