1. 程式人生 > 實用技巧 >基於zookeeper的分散式鎖實現

基於zookeeper的分散式鎖實現

>>> hot3.png

工作中需要寫一個定時任務,由於是叢集環境,自然而然想到需要通過分散式鎖來保證單臺執行..相信大家都會想到使用zk來實現對應的分散式鎖.下面就簡單介紹一下幾種實現

準備工作

有幾個幫助類,先把程式碼放上來

ZKClient 對zk的操作做了一個簡單的封裝

Java程式碼收藏程式碼

  1. packagezk.lock;
  2. importorg.apache.zookeeper.*;
  3. importorg.apache.zookeeper.data.Stat;
  4. importzk.util.ZKUtil;
  5. importjava.util.concurrent.CountDownLatch;
  6. importjava.util.concurrent.TimeUnit;
  7. /**
  8. *User:zhenghui
  9. *Date:14-3-26
  10. *Time:下午8:50
  11. *封裝一個zookeeper例項.
  12. */
  13. publicclassZKClientimplementsWatcher{
  14. privateZooKeeperzookeeper;
  15. privateCountDownLatchconnectedSemaphore=newCountDownLatch(1);
  16. publicZKClient(StringconnectString,intsessionTimeout)throwsException{
  17. zookeeper=newZooKeeper(connectString,sessionTimeout,this);
  18. System.out.println("connectingzkserver");
  19. if(connectedSemaphore.await(10l,TimeUnit.SECONDS)){
  20. System.out.println("connectzkserversuccess");
  21. }else{
  22. System.out.println("connectzkservererror.");
  23. thrownewException("connectzkservererror.");
  24. }
  25. }
  26. publicvoidclose()throwsInterruptedException{
  27. if(zookeeper!=null){
  28. zookeeper.close();
  29. }
  30. }
  31. publicvoidcreatePathIfAbsent(Stringpath,booleanisPersistent)throwsException{
  32. CreateModecreateMode=isPersistent?CreateMode.PERSISTENT:CreateMode.EPHEMERAL;
  33. path=ZKUtil.normalize(path);
  34. if(!this.exists(path)){
  35. zookeeper.create(path,null,ZooDefs.Ids.OPEN_ACL_UNSAFE,createMode);
  36. }
  37. }
  38. publicbooleanexists(Stringpath)throwsException{
  39. path=ZKUtil.normalize(path);
  40. Statstat=zookeeper.exists(path,null);
  41. returnstat!=null;
  42. }
  43. publicStringgetData(Stringpath)throwsException{
  44. path=ZKUtil.normalize(path);
  45. try{
  46. byte[]data=zookeeper.getData(path,null,null);
  47. returnnewString(data);
  48. }catch(KeeperExceptione){
  49. if(einstanceofKeeperException.NoNodeException){
  50. thrownewException("Nodedoesnotexist,pathis["+e.getPath()+"].",e);
  51. }else{
  52. thrownewException(e);
  53. }
  54. }catch(InterruptedExceptione){
  55. Thread.currentThread().interrupt();
  56. thrownewException(e);
  57. }
  58. }
  59. @Override
  60. publicvoidprocess(WatchedEventevent){
  61. if(event==null)return;
  62. //連線狀態
  63. Watcher.Event.KeeperStatekeeperState=event.getState();
  64. //事件型別
  65. Watcher.Event.EventTypeeventType=event.getType();
  66. //受影響的path
  67. //Stringpath=event.getPath();
  68. if(Watcher.Event.KeeperState.SyncConnected==keeperState){
  69. //成功連線上ZK伺服器
  70. if(Watcher.Event.EventType.None==eventType){
  71. System.out.println("zookeeperconnectsuccess");
  72. connectedSemaphore.countDown();
  73. }
  74. }
  75. //下面可以做一些重連的工作.
  76. elseif(Watcher.Event.KeeperState.Disconnected==keeperState){
  77. System.out.println("zookeeperDisconnected");
  78. }elseif(Watcher.Event.KeeperState.AuthFailed==keeperState){
  79. System.out.println("zookeeperAuthFailed");
  80. }elseif(Watcher.Event.KeeperState.Expired==keeperState){
  81. System.out.println("zookeeperExpired");
  82. }
  83. }
  84. }

ZKUtil 針對zk路徑的一個工具類

Java程式碼收藏程式碼

  1. packagezk.util;
  2. /**
  3. *User:zhenghui
  4. *Date:14-3-26
  5. *Time:下午9:56
  6. */
  7. publicclassZKUtil{
  8. publicstaticfinalStringSEPARATOR="/";
  9. /**
  10. *轉換path為zk的標準路徑以/開頭,最後不帶/
  11. */
  12. publicstaticStringnormalize(Stringpath){
  13. Stringtemp=path;
  14. if(!path.startsWith(SEPARATOR)){
  15. temp=SEPARATOR+path;
  16. }
  17. if(path.endsWith(SEPARATOR)){
  18. temp=temp.substring(0,temp.length()-1);
  19. returnnormalize(temp);
  20. }else{
  21. returntemp;
  22. }
  23. }
  24. /**
  25. *連結兩個path,並轉化為zk的標準路徑
  26. */
  27. publicstaticStringcontact(Stringpath1,Stringpath2){
  28. if(path2.startsWith(SEPARATOR)){
  29. path2=path2.substring(1);
  30. }
  31. if(path1.endsWith(SEPARATOR)){
  32. returnnormalize(path1+path2);
  33. }else{
  34. returnnormalize(path1+SEPARATOR+path2);
  35. }
  36. }
  37. /**
  38. *字串轉化成byte型別
  39. */
  40. publicstaticbyte[]toBytes(Stringdata){
  41. if(data==null||data.trim().equals(""))returnnull;
  42. returndata.getBytes();
  43. }
  44. }

NetworkUtil 獲取本機IP的工具方法

Java程式碼收藏程式碼

  1. packagezk.util;
  2. importjava.net.InetAddress;
  3. importjava.net.NetworkInterface;
  4. importjava.util.Enumeration;
  5. /**
  6. *User:zhenghui
  7. *Date:14-4-1
  8. *Time:下午4:47
  9. */
  10. publicclassNetworkUtil{
  11. staticprivatefinalcharCOLON=':';
  12. /**
  13. *獲取當前機器ip地址
  14. *據說多網絡卡的時候會有問題.
  15. */
  16. publicstaticStringgetNetworkAddress(){
  17. Enumeration<NetworkInterface>netInterfaces;
  18. try{
  19. netInterfaces=NetworkInterface.getNetworkInterfaces();
  20. InetAddressip;
  21. while(netInterfaces.hasMoreElements()){
  22. NetworkInterfaceni=netInterfaces
  23. .nextElement();
  24. Enumeration<InetAddress>addresses=ni.getInetAddresses();
  25. while(addresses.hasMoreElements()){
  26. ip=addresses.nextElement();
  27. if(!ip.isLoopbackAddress()
  28. &&ip.getHostAddress().indexOf(COLON)==-1){
  29. returnip.getHostAddress();
  30. }
  31. }
  32. }
  33. return"";
  34. }catch(Exceptione){
  35. return"";
  36. }
  37. }
  38. }

--------------------------- 正文開始 -----------------------------------

這種實現非常簡單,具體的流程如下



對應的實現如下

Java程式碼收藏程式碼

  1. packagezk.lock;
  2. importzk.util.NetworkUtil;
  3. importzk.util.ZKUtil;
  4. /**
  5. *User:zhenghui
  6. *Date:14-3-26
  7. *Time:下午8:37
  8. *分散式鎖實現.
  9. *
  10. *這種實現的原理是,建立某一個任務的節點,比如/lock/tasckname然後獲取對應的值,如果是當前的Ip,那麼獲得鎖,如果不是,則沒獲得
  11. *.如果該節點不存在,則建立該節點,並把改節點的值設定成當前的IP
  12. */
  13. publicclassDistributedLock01{
  14. privateZKClientzkClient;
  15. publicstaticfinalStringLOCK_ROOT="/lock";
  16. privateStringlockName;
  17. publicDistributedLock01(StringconnectString,intsessionTimeout,StringlockName)throwsException{
  18. //先建立zk連結.
  19. this.createConnection(connectString,sessionTimeout);
  20. this.lockName=lockName;
  21. }
  22. publicbooleantryLock(){
  23. Stringpath=ZKUtil.contact(LOCK_ROOT,lockName);
  24. StringlocalIp=NetworkUtil.getNetworkAddress();
  25. try{
  26. if(zkClient.exists(path)){
  27. StringownnerIp=zkClient.getData(path);
  28. if(localIp.equals(ownnerIp)){
  29. returntrue;
  30. }
  31. }else{
  32. zkClient.createPathIfAbsent(path,false);
  33. if(zkClient.exists(path)){
  34. StringownnerIp=zkClient.getData(path);
  35. if(localIp.equals(ownnerIp)){
  36. returntrue;
  37. }
  38. }
  39. }
  40. }catch(Exceptione){
  41. e.printStackTrace();
  42. }
  43. returnfalse;
  44. }
  45. /**
  46. *建立zk連線
  47. *
  48. */
  49. protectedvoidcreateConnection(StringconnectString,intsessionTimeout)throwsException{
  50. if(zkClient!=null){
  51. releaseConnection();
  52. }
  53. zkClient=newZKClient(connectString,sessionTimeout);
  54. zkClient.createPathIfAbsent(LOCK_ROOT,true);
  55. }
  56. /**
  57. *關閉ZK連線
  58. */
  59. protectedvoidreleaseConnection()throwsInterruptedException{
  60. if(zkClient!=null){
  61. zkClient.close();
  62. }
  63. }
  64. }

總結

網上有很多文章,大家的方法大多數都是建立一個root根節點,每一個trylock的客戶端都會在root下建立一個EPHEMERAL_SEQUENTIAL 的子節點,同時設定root的child 變更watcher(為了避免羊群效應,可以只新增

前一個節點的變更通知) .如果建立的節點的序號是最小,則獲取到鎖,否則繼續等待root的child 變更

框架簡介:

本系統一款通用的SOA中介軟體平臺,用來開發各類J2EE企業級應用,節省時間和人力成本。本系統採用MVC模式、AOP引擎、任務排程器、工作流、Ajax、攔截器、過濾器、快取、日誌監控、資料訪問、表示式、國際化等技術。

使用者許可權系統:
組織結構:角色、使用者、使用者組、組織機構;許可權點:頁面、方法、按鈕、資料許可權、分級授權

專案管理新體驗:
快速出原型系統、元件樹、版本控制、模組移植、協同開發、實時監控、釋出管理

可持續整合:
所有元件可移植、可定製、可擴充,開發成果不斷積累,形成可持續發展的良性迴圈

框架/平臺構成:
Springmvc + Mybatis + Shiro(許可權)+SSO(單點登入) + Tiles(模板) +ActiveMQ(訊息佇列) + Rest(服務) + WebService(服務)+ EHcache(快取) + Lucene(搜尋引擎) + Quartz(定時排程)+ Html5(支援PC、IOS、Android)

支援平臺平臺:
Windows XP、Windows 7 、Windows 10 、 Linux 、 Unix

伺服器容器:
Tomcat 5/6/7 、Jetty、JBoss、WebSphere 8.5

專案原始碼結構截圖:

JEESZ個人版Maven+SpringMVC+Mybatis+shiro+restful開發平臺

專案執行截圖:





JEESZ個人版Maven+SpringMVC+Mybatis+shiro+restful開發平臺







JEESZ個人版Maven+SpringMVC+Mybatis+shiro+restful開發平臺

JEESZ個人版Maven+SpringMVC+Mybatis+shiro+restful開發平臺

JEESZ個人版Maven+SpringMVC+Mybatis+shiro+restful開發平臺



JEESZ個人版Maven+SpringMVC+Mybatis+shiro+restful開發平臺

JEESZ個人版Maven+SpringMVC+Mybatis+shiro+restful開發平臺



JEESZ個人版Maven+SpringMVC+Mybatis+shiro+restful開發平臺





JEESZ個人版Maven+SpringMVC+Mybatis+shiro+restful開發平臺



JEESZ個人版Maven+SpringMVC+Mybatis+shiro+restful開發平臺
JEESZ個人版Maven+SpringMVC+Mybatis+shiro+restful開發平臺

JEESZ個人版Maven+SpringMVC+Mybatis+shiro+restful開發平臺



JEESZ個人版Maven+SpringMVC+Mybatis+shiro+restful開發平臺



JEESZ個人版Maven+SpringMVC+Mybatis+shiro+restful開發平臺

JEESZ個人版Maven+SpringMVC+Mybatis+shiro+restful開發平臺

JEESZ個人版Maven+SpringMVC+Mybatis+shiro+restful開發平臺

JEESZ個人版Maven+SpringMVC+Mybatis+shiro+restful開發平臺
JEESZ個人版Maven+SpringMVC+Mybatis+shiro+restful開發平臺

JEESZ個人版Maven+SpringMVC+Mybatis+shiro+restful開發平臺

JEESZ個人版Maven+SpringMVC+Mybatis+shiro+restful開發平臺

JEESZ個人版Maven+SpringMVC+Mybatis+shiro+restful開發平臺



JEESZ個人版Maven+SpringMVC+Mybatis+shiro+restful開發平臺









JEESZ個人版Maven+SpringMVC+Mybatis+shiro+restful開發平臺



JEESZ個人版Maven+SpringMVC+Mybatis+shiro+restful開發平臺

轉載於:https://my.oschina.net/grthrj/blog/729114