基於zookeeper的分散式鎖實現
工作中需要寫一個定時任務,由於是叢集環境,自然而然想到需要通過分散式鎖來保證單臺執行..相信大家都會想到使用zk來實現對應的分散式鎖.下面就簡單介紹一下幾種實現
準備工作
有幾個幫助類,先把程式碼放上來
ZKClient 對zk的操作做了一個簡單的封裝
Java程式碼
- packagezk.lock;
- importorg.apache.zookeeper.*;
- importorg.apache.zookeeper.data.Stat;
- importzk.util.ZKUtil;
- importjava.util.concurrent.CountDownLatch;
- importjava.util.concurrent.TimeUnit;
- /**
- *User:zhenghui
- *Date:14-3-26
- *Time:下午8:50
- *封裝一個zookeeper例項.
- */
- publicclassZKClientimplementsWatcher{
- privateZooKeeperzookeeper;
- privateCountDownLatchconnectedSemaphore=newCountDownLatch(1);
- publicZKClient(StringconnectString,intsessionTimeout)throwsException{
- zookeeper=newZooKeeper(connectString,sessionTimeout,this);
- System.out.println("connectingzkserver");
- if(connectedSemaphore.await(10l,TimeUnit.SECONDS)){
- System.out.println("connectzkserversuccess");
- }else{
- System.out.println("connectzkservererror.");
- thrownewException("connectzkservererror.");
- }
- }
- publicvoidclose()throwsInterruptedException{
- if(zookeeper!=null){
- zookeeper.close();
- }
- }
- publicvoidcreatePathIfAbsent(Stringpath,booleanisPersistent)throwsException{
- CreateModecreateMode=isPersistent?CreateMode.PERSISTENT:CreateMode.EPHEMERAL;
- path=ZKUtil.normalize(path);
- if(!this.exists(path)){
- zookeeper.create(path,null,ZooDefs.Ids.OPEN_ACL_UNSAFE,createMode);
- }
- }
- publicbooleanexists(Stringpath)throwsException{
- path=ZKUtil.normalize(path);
- Statstat=zookeeper.exists(path,null);
- returnstat!=null;
- }
- publicStringgetData(Stringpath)throwsException{
- path=ZKUtil.normalize(path);
- try{
- byte[]data=zookeeper.getData(path,null,null);
- returnnewString(data);
- }catch(KeeperExceptione){
- if(einstanceofKeeperException.NoNodeException){
- thrownewException("Nodedoesnotexist,pathis["+e.getPath()+"].",e);
- }else{
- thrownewException(e);
- }
- }catch(InterruptedExceptione){
- Thread.currentThread().interrupt();
- thrownewException(e);
- }
- }
- @Override
- publicvoidprocess(WatchedEventevent){
- if(event==null)return;
- //連線狀態
- Watcher.Event.KeeperStatekeeperState=event.getState();
- //事件型別
- Watcher.Event.EventTypeeventType=event.getType();
- //受影響的path
- //Stringpath=event.getPath();
- if(Watcher.Event.KeeperState.SyncConnected==keeperState){
- //成功連線上ZK伺服器
- if(Watcher.Event.EventType.None==eventType){
- System.out.println("zookeeperconnectsuccess");
- connectedSemaphore.countDown();
- }
- }
- //下面可以做一些重連的工作.
- elseif(Watcher.Event.KeeperState.Disconnected==keeperState){
- System.out.println("zookeeperDisconnected");
- }elseif(Watcher.Event.KeeperState.AuthFailed==keeperState){
- System.out.println("zookeeperAuthFailed");
- }elseif(Watcher.Event.KeeperState.Expired==keeperState){
- System.out.println("zookeeperExpired");
- }
- }
- }
ZKUtil 針對zk路徑的一個工具類
Java程式碼
- packagezk.util;
- /**
- *User:zhenghui
- *Date:14-3-26
- *Time:下午9:56
- */
- publicclassZKUtil{
- publicstaticfinalStringSEPARATOR="/";
- /**
- *轉換path為zk的標準路徑以/開頭,最後不帶/
- */
- publicstaticStringnormalize(Stringpath){
- Stringtemp=path;
- if(!path.startsWith(SEPARATOR)){
- temp=SEPARATOR+path;
- }
- if(path.endsWith(SEPARATOR)){
- temp=temp.substring(0,temp.length()-1);
- returnnormalize(temp);
- }else{
- returntemp;
- }
- }
- /**
- *連結兩個path,並轉化為zk的標準路徑
- */
- publicstaticStringcontact(Stringpath1,Stringpath2){
- if(path2.startsWith(SEPARATOR)){
- path2=path2.substring(1);
- }
- if(path1.endsWith(SEPARATOR)){
- returnnormalize(path1+path2);
- }else{
- returnnormalize(path1+SEPARATOR+path2);
- }
- }
- /**
- *字串轉化成byte型別
- */
- publicstaticbyte[]toBytes(Stringdata){
- if(data==null||data.trim().equals(""))returnnull;
- returndata.getBytes();
- }
- }
NetworkUtil 獲取本機IP的工具方法
Java程式碼
- packagezk.util;
- importjava.net.InetAddress;
- importjava.net.NetworkInterface;
- importjava.util.Enumeration;
- /**
- *User:zhenghui
- *Date:14-4-1
- *Time:下午4:47
- */
- publicclassNetworkUtil{
- staticprivatefinalcharCOLON=':';
- /**
- *獲取當前機器ip地址
- *據說多網絡卡的時候會有問題.
- */
- publicstaticStringgetNetworkAddress(){
- Enumeration<NetworkInterface>netInterfaces;
- try{
- netInterfaces=NetworkInterface.getNetworkInterfaces();
- InetAddressip;
- while(netInterfaces.hasMoreElements()){
- NetworkInterfaceni=netInterfaces
- .nextElement();
- Enumeration<InetAddress>addresses=ni.getInetAddresses();
- while(addresses.hasMoreElements()){
- ip=addresses.nextElement();
- if(!ip.isLoopbackAddress()
- &&ip.getHostAddress().indexOf(COLON)==-1){
- returnip.getHostAddress();
- }
- }
- }
- return"";
- }catch(Exceptione){
- return"";
- }
- }
- }
--------------------------- 正文開始 -----------------------------------
這種實現非常簡單,具體的流程如下
對應的實現如下
Java程式碼
- packagezk.lock;
- importzk.util.NetworkUtil;
- importzk.util.ZKUtil;
- /**
- *User:zhenghui
- *Date:14-3-26
- *Time:下午8:37
- *分散式鎖實現.
- *
- *這種實現的原理是,建立某一個任務的節點,比如/lock/tasckname然後獲取對應的值,如果是當前的Ip,那麼獲得鎖,如果不是,則沒獲得
- *.如果該節點不存在,則建立該節點,並把改節點的值設定成當前的IP
- */
- publicclassDistributedLock01{
- privateZKClientzkClient;
- publicstaticfinalStringLOCK_ROOT="/lock";
- privateStringlockName;
- publicDistributedLock01(StringconnectString,intsessionTimeout,StringlockName)throwsException{
- //先建立zk連結.
- this.createConnection(connectString,sessionTimeout);
- this.lockName=lockName;
- }
- publicbooleantryLock(){
- Stringpath=ZKUtil.contact(LOCK_ROOT,lockName);
- StringlocalIp=NetworkUtil.getNetworkAddress();
- try{
- if(zkClient.exists(path)){
- StringownnerIp=zkClient.getData(path);
- if(localIp.equals(ownnerIp)){
- returntrue;
- }
- }else{
- zkClient.createPathIfAbsent(path,false);
- if(zkClient.exists(path)){
- StringownnerIp=zkClient.getData(path);
- if(localIp.equals(ownnerIp)){
- returntrue;
- }
- }
- }
- }catch(Exceptione){
- e.printStackTrace();
- }
- returnfalse;
- }
- /**
- *建立zk連線
- *
- */
- protectedvoidcreateConnection(StringconnectString,intsessionTimeout)throwsException{
- if(zkClient!=null){
- releaseConnection();
- }
- zkClient=newZKClient(connectString,sessionTimeout);
- zkClient.createPathIfAbsent(LOCK_ROOT,true);
- }
- /**
- *關閉ZK連線
- */
- protectedvoidreleaseConnection()throwsInterruptedException{
- if(zkClient!=null){
- zkClient.close();
- }
- }
- }
總結
網上有很多文章,大家的方法大多數都是建立一個root根節點,每一個trylock的客戶端都會在root下建立一個EPHEMERAL_SEQUENTIAL 的子節點,同時設定root的child 變更watcher(為了避免羊群效應,可以只新增
前一個節點的變更通知) .如果建立的節點的序號是最小,則獲取到鎖,否則繼續等待root的child 變更
框架簡介:
本系統一款通用的SOA中介軟體平臺,用來開發各類J2EE企業級應用,節省時間和人力成本。本系統採用MVC模式、AOP引擎、任務排程器、工作流、Ajax、攔截器、過濾器、快取、日誌監控、資料訪問、表示式、國際化等技術。
使用者許可權系統:
組織結構:角色、使用者、使用者組、組織機構;許可權點:頁面、方法、按鈕、資料許可權、分級授權
專案管理新體驗:
快速出原型系統、元件樹、版本控制、模組移植、協同開發、實時監控、釋出管理
可持續整合:
所有元件可移植、可定製、可擴充,開發成果不斷積累,形成可持續發展的良性迴圈
框架/平臺構成:
Springmvc + Mybatis + Shiro(許可權)+SSO(單點登入) + Tiles(模板) +ActiveMQ(訊息佇列) + Rest(服務) + WebService(服務)+ EHcache(快取) + Lucene(搜尋引擎) + Quartz(定時排程)+ Html5(支援PC、IOS、Android)
支援平臺平臺:
Windows XP、Windows 7 、Windows 10 、 Linux 、 Unix
伺服器容器:
Tomcat 5/6/7 、Jetty、JBoss、WebSphere 8.5
專案原始碼結構截圖:
轉載於:https://my.oschina.net/grthrj/blog/729114