1. 程式人生 > 其它 >ZooKeeper分散式鎖和JavaAPI操作

ZooKeeper分散式鎖和JavaAPI操作

技術標籤:zookeeper

目錄

1 JavaAPI Curator介紹

•Curator 是 Apache ZooKeeper 的Java客戶端庫。

•常見的ZooKeeper Java API :

•原生Java API

•ZkClient

•Curator

•Curator 專案的目標是簡化 ZooKeeper 客戶端的使用。

•Curator 最初是 Netfix 研發的,後來捐獻了 Apache 基金會,目前是 Apache 的頂級專案。

•官網:http://curator.apache.org/

2 JavaAPI操作建立連線

1,搭建專案

建立專案curator-zk

引入pom和日誌檔案

資料資料夾下pom.xml和log4j.properties

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片儲存下來直接上傳(img-gl7xdSYV-1607995210036)(assets\1592055569716.png)]

依賴

<dependencies>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</
artifactId
>
<version>4.10</version> <scope>test</scope> </dependency> <!--curator--> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>4.0.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>4.0.0</version> </dependency> <!--日誌--> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.21</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.21</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.1</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build>

2、建立測試類,使用curator連線zookeeper

@Before
public void testConnect() {
    //重試策略,3000每次連線時間,10表示次數。30秒獲取不到報錯
    RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
   
    //CuratorFrameworkFactory.builder();
    client = CuratorFrameworkFactory.builder()
        .connectString("localhost:2181")
        .sessionTimeoutMs(60 * 1000)//session超時時間,客戶端連線上服務端後60S沒有任何操作就中斷
        .connectionTimeoutMs(15 * 1000)//獲取連線的最長等待時間15
        .retryPolicy(retryPolicy)//15秒獲取不到連結,就啟用重試策略
        .namespace("itheima")//基於itheima接操作,itheima是根目錄/的一個節點,注意事項不要加/
        .build();
    //開啟連線
    client.start();
}

3 Zookeeper JavaAPI操作-建立節點

/**
* 建立節點:create 持久 臨時 順序 資料
* 1. 基本建立 :create().forPath("")
* 2. 建立節點 帶有資料:create().forPath("",data)
* 3. 設定節點的型別:create().withMode().forPath("",data)
* 4. 建立多級節點  /app1/p1 :create().creatingParentsIfNeeded().forPath("",data)
*/
@Test
public void testCreate() throws Exception {
    //2. 建立節點 帶有資料
    //如果建立節點,沒有指定資料,則預設將當前客戶端的ip作為資料儲存
    String path = client.create().forPath("/app2", "hehe".getBytes());
    System.out.println(path);
}
@Test
public void testCreate2() throws Exception {
    //1. 基本建立
    //如果建立節點,沒有指定資料,則預設將當前客戶端的ip作為資料儲存
    String path = client.create().forPath("/app1");
    System.out.println(path);
}
@Test
public void testCreate3() throws Exception {
    //3. 設定節點的型別
    //預設型別:持久化
    String path = client.create().withMode(CreateMode.EPHEMERAL).forPath("/app3");
    System.out.println(path);
}
@Test
public void testCreate4() throws Exception {
    //4. 建立多級節點  /app1/p1
    //creatingParentsIfNeeded():如果父節點不存在,則建立父節點
    String path = client.create().creatingParentsIfNeeded().forPath("/app4/p1");
    System.out.println(path);
}

4 ZookeeperJavaAPI操作-查詢節點

/**
* 查詢節點:
* 1. 查詢資料:get: getData().forPath()
* 2. 查詢子節點: ls: getChildren().forPath()
* 3. 查詢節點狀態資訊:ls -s:getData().storingStatIn(狀態物件).forPath()
*/
@Test
public void testGet1() throws Exception {
    //1. 查詢資料:get
    byte[] data = client.getData().forPath("/app1");
    System.out.println(new String(data));
}
@Test
public void testGet2() throws Exception {
    // 2. 查詢子節點: ls
    List<String> path = client.getChildren().forPath("/");
    System.out.println(path);
}
@Test
public void testGet3() throws Exception {
    Stat status = new Stat();
    System.out.println(status);
    //3. 查詢節點狀態資訊:ls -s
    client.getData().storingStatIn(status).forPath("/app1");
    System.out.println(status);
}

5 Zookeeper JavaAPI操作-修改節點

/**
* 修改資料
* 1. 基本修改資料:setData().forPath()
* 2. 根據版本修改: setData().withVersion().forPath()
* * version 是通過查詢出來的。目的就是為了讓其他客戶端或者執行緒不干擾我。
*
* @throws Exception
*/
@Test
public void testSet() throws Exception {
	client.setData().forPath("/app1", "itcast".getBytes());
}
@Test
public void testSetForVersion() throws Exception {
    Stat status = new Stat();
    //3. 查詢節點狀態資訊:ls -s
    client.getData().storingStatIn(status).forPath("/app1");
    int version = status.getVersion();//查詢出來的 3
    System.out.println(version);
    client.setData().withVersion(version).forPath("/app1", "hehe".getBytes());
}

6 Zookeeper JavaAPI操作-刪除節點

/**
* 刪除節點: delete deleteall
* 1. 刪除單個節點:delete().forPath("/app1");
* 2. 刪除帶有子節點的節點:delete().deletingChildrenIfNeeded().forPath("/app1");
* 3. 必須成功的刪除:為了防止網路抖動。本質就是重試。  client.delete().guaranteed().forPath("/app2");
* 4. 回撥:inBackground
* @throws Exception
*/
@Test
public void testDelete() throws Exception {
    // 1. 刪除單個節點
    client.delete().forPath("/app1");
    
}
@Test
public void testDelete2() throws Exception {
    //2. 刪除帶有子節點的節點
    client.delete().deletingChildrenIfNeeded().forPath("/app4");
}
@Test
public void testDelete3() throws Exception {
    //3. 必須成功的刪除
    client.delete().guaranteed().forPath("/app2");
}
@Test
public void testDelete4() throws Exception {
    //4. 回撥
    client.delete().guaranteed().inBackground(new BackgroundCallback(){
        @Override
        public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
            System.out.println("我被刪除了~");
            System.out.println(event);
        }
    }).forPath("/app1");
}

7 Zookeeper JavaAPI操作-Watch監聽概述

•ZooKeeper 允許使用者在指定節點上註冊一些Watcher,並且在一些特定事件觸發的時候,ZooKeeper 服務端會將事件通知到感興趣的客戶端上去,該機制是 ZooKeeper 實現分散式協調服務的重要特性。

•ZooKeeper 中引入了Watcher機制來實現了釋出/訂閱功能能,能夠讓多個訂閱者同時監聽某一個物件,當一個物件自身狀態變化時,會通知所有訂閱者。

•ZooKeeper 原生支援通過註冊Watcher來進行事件監聽,但是其使用並不是特別方便

​ 需要開發人員自己反覆註冊Watcher,比較繁瑣。

•Curator引入了 Cache 來實現對 ZooKeeper 服務端事件的監聽。

•ZooKeeper提供了三種Watcher:

•NodeCache : 只是監聽某一個特定的節點

•PathChildrenCache : 監控一個ZNode的子節點.

•TreeCache : 可以監控整個樹上的所有節點,類似於PathChildrenCache和NodeCache的組合

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片儲存下來直接上傳(img-6DJRtgzk-1607995210038)(assets\1592057429708.png)]

8 Zookeeper JavaAPI操作-Watch監聽-NodeCache

/**
* 演示 NodeCache:給指定一個節點註冊監聽器
*/
@Test
public void testNodeCache() throws Exception {
    //1. 建立NodeCache物件
    final NodeCache nodeCache = new NodeCache(client,"/app1");
    //2. 註冊監聽
   	nodeCache.getListenable().addListener(new NodeCacheListener() {
        @Override
        public void nodeChanged() throws Exception {
            System.out.println("節點變化了~");
            //獲取修改節點後的資料
            byte[] data = nodeCache.getCurrentData().getData();
            System.out.println(new String(data));
            }
        });
    	//3. 開啟監聽.如果設定為true,則開啟監聽是,載入緩衝資料
    	nodeCache.start(true);
    	while (true){
    	}
}

9 Zookeeper JavaAPI操作-Watch監聽-PathChildrenCache

@Test
public void testPathChildrenCache() throws Exception {
    //1.建立監聽物件
    PathChildrenCache pathChildrenCache = new PathChildrenCache(client,"/app2",true);
    //2. 繫結監聽器
    pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {    			@Override
        public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
            System.out.println("子節點變化了~");
            System.out.println(event);
            //監聽子節點的資料變更,並且拿到變更後的資料
            //1.獲取型別
            PathChildrenCacheEvent.Type type = event.getType();
            //2.判斷型別是否是update
            if(type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){
                System.out.println("資料變了!!!");
                byte[] data = event.getData().getData();
                System.out.println(new String(data));
            }
        }
    });
    //3. 開啟
    pathChildrenCache.start();
    while (true){
    }
}

10 Zookeeper JavaAPI操作-Watch監聽-TreeCache

/**
* 演示 TreeCache:監聽某個節點自己和所有子節點們
*/
@Test
public void testTreeCache() throws Exception {
    //1. 建立監聽器
    TreeCache treeCache = new TreeCache(client,"/app2");
    //2. 註冊監聽
    treeCache.getListenable().addListener(new TreeCacheListener() {
        @Override
        public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
            System.out.println("節點變化了");
            System.out.println(event);
        }
    });
    //3. 開啟
    treeCache.start();
    while (true){
    }
}

11 Zookeeper分散式鎖-概念

•在我們進行單機應用開發,涉及併發同步的時候,我們往往採用synchronized或者Lock的方式來解決多執行緒間的程式碼同步問題,這時多執行緒的執行都是在同一個JVM之下,沒有任何問題。

•但當我們的應用是分散式叢集工作的情況下,屬於多JVM下的工作環境,跨JVM之間已經無法通過多執行緒的鎖解決同步問題。

•那麼就需要一種更加高階的鎖機制,來處理種跨機器的程序之間的資料同步問題——這就是分散式鎖。

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片儲存下來直接上傳(img-t0GDJm8G-1607995210039)(assets\1592057871141.png)]

12 Zookeeper 分散式鎖-zookeeper分散式鎖原理

•核心思想:當客戶端要獲取鎖,則建立節點,使用完鎖,則刪除該節點。

1.客戶端獲取鎖時,在lock節點下建立臨時順序節點。

2.然後獲取lock下面的所有子節點,客戶端獲取到所有的子節點之後,如果發現自己建立的子節點序號最小,那麼就認為該客戶端獲取到了鎖。使用完鎖後,將該節點刪除。

3.如果發現自己建立的節點並非lock所有子節點中最小的,說明自己還沒有獲取到鎖,此時客戶端需要找到比自己小的那個節點,同時對其註冊事件監聽器,監聽刪除事件。

4.如果發現比自己小的那個節點被刪除,則客戶端的

​ Watcher會收到相應通知,此時再次判斷自己建立的節點

​ 是否是lock子節點中序號最小的,如果是則獲取到了鎖,

​ 如果不是則重複以上步驟繼續獲取到比自己小的一個節點

​ 並註冊監聽。

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片儲存下來直接上傳(img-UrdQpK59-1607995210042)(assets\1592057925831.png)]

13 Zookeeper 分散式鎖-模擬12306售票案例

Curator實現分散式鎖API

  • 在Curator中有五種鎖方案:

    • InterProcessSemaphoreMutex:分散式排它鎖(非可重入鎖)

    • InterProcessMutex:分散式可重入排它鎖

    • InterProcessReadWriteLock:分散式讀寫鎖

    • InterProcessMultiLock:將多個鎖作為單個實體管理的容器

    • InterProcessSemaphoreV2:共享訊號量

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片儲存下來直接上傳(img-7q9L9P1v-1607995210044)(assets\1592058017457.png)]

1,建立執行緒進行加鎖設定

public class Ticket12306 implements Runnable{
    private int tickets = 10;//資料庫的票數
    private InterProcessMutex lock ;
    @Override
    public void run() {
        while(true){
            //獲取鎖
            try {
            lock.acquire(3, TimeUnit.SECONDS);
                if(tickets > 0){
                    System.out.println(Thread.currentThread()+":"+tickets);
                    Thread.sleep(100);
                    tickets--;
                }
            } catch (Exception e) {
                e.printStackTrace();
            }finally {
                //釋放鎖
                try {
                    lock.release();
                } catch (Exception e) {
                    e.printStackTrace();
                }

            }
        }
    }
}

2,建立連線,並且初始化鎖

public Ticket12306(){
    //重試策略
    RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
    //2.第二種方式
    //CuratorFrameworkFactory.builder();
    CuratorFramework client = CuratorFrameworkFactory.builder()
        .connectString("192.168.149.135:2181")
        .sessionTimeoutMs(60 * 1000)
        .connectionTimeoutMs(15 * 1000)
        .retryPolicy(retryPolicy)
        .build();
    //開啟連線
    client.start();
    lock = new InterProcessMutex(client,"/lock");
}

3,執行多個執行緒進行測試

public class LockTest {
    public static void main(String[] args) {
        Ticket12306 ticket12306 = new Ticket12306();
        //建立客戶端
        Thread t1 = new Thread(ticket12306,"攜程");
        Thread t2 = new Thread(ticket12306,"飛豬");
        t1.start();
        t2.start();
    }
}