ZooKeeper分散式鎖和JavaAPI操作
技術標籤:zookeeper
目錄
- 1 JavaAPI Curator介紹
- 2 JavaAPI操作建立連線
- 3 Zookeeper JavaAPI操作-建立節點
- 4 ZookeeperJavaAPI操作-查詢節點
- 5 Zookeeper JavaAPI操作-修改節點
- 6 Zookeeper JavaAPI操作-刪除節點
- 7 Zookeeper JavaAPI操作-Watch監聽概述
- 8 Zookeeper JavaAPI操作-Watch監聽-NodeCache
- 9 Zookeeper JavaAPI操作-Watch監聽-PathChildrenCache
- 10 Zookeeper JavaAPI操作-Watch監聽-TreeCache
- 11 Zookeeper分散式鎖-概念
- 12 Zookeeper 分散式鎖-zookeeper分散式鎖原理
- 13 Zookeeper 分散式鎖-模擬12306售票案例
1 JavaAPI Curator介紹
•Curator 是 Apache ZooKeeper 的Java客戶端庫。
•常見的ZooKeeper Java API :
•原生Java API
•ZkClient
•Curator
•Curator 專案的目標是簡化 ZooKeeper 客戶端的使用。
•Curator 最初是 Netfix 研發的,後來捐獻了 Apache 基金會,目前是 Apache 的頂級專案。
•官網:http://curator.apache.org/
2 JavaAPI操作建立連線
1,搭建專案
建立專案curator-zk
引入pom和日誌檔案
資料資料夾下pom.xml和log4j.properties
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片儲存下來直接上傳(img-gl7xdSYV-1607995210036)(assets\1592055569716.png)]
依賴
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</ artifactId>
<version>4.10</version>
<scope>test</scope>
</dependency>
<!--curator-->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.0.0</version>
</dependency>
<!--日誌-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.21</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.21</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
2、建立測試類,使用curator連線zookeeper
@Before
public void testConnect() {
//重試策略,3000每次連線時間,10表示次數。30秒獲取不到報錯
RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
//CuratorFrameworkFactory.builder();
client = CuratorFrameworkFactory.builder()
.connectString("localhost:2181")
.sessionTimeoutMs(60 * 1000)//session超時時間,客戶端連線上服務端後60S沒有任何操作就中斷
.connectionTimeoutMs(15 * 1000)//獲取連線的最長等待時間15
.retryPolicy(retryPolicy)//15秒獲取不到連結,就啟用重試策略
.namespace("itheima")//基於itheima接操作,itheima是根目錄/的一個節點,注意事項不要加/
.build();
//開啟連線
client.start();
}
3 Zookeeper JavaAPI操作-建立節點
/**
* 建立節點:create 持久 臨時 順序 資料
* 1. 基本建立 :create().forPath("")
* 2. 建立節點 帶有資料:create().forPath("",data)
* 3. 設定節點的型別:create().withMode().forPath("",data)
* 4. 建立多級節點 /app1/p1 :create().creatingParentsIfNeeded().forPath("",data)
*/
@Test
public void testCreate() throws Exception {
//2. 建立節點 帶有資料
//如果建立節點,沒有指定資料,則預設將當前客戶端的ip作為資料儲存
String path = client.create().forPath("/app2", "hehe".getBytes());
System.out.println(path);
}
@Test
public void testCreate2() throws Exception {
//1. 基本建立
//如果建立節點,沒有指定資料,則預設將當前客戶端的ip作為資料儲存
String path = client.create().forPath("/app1");
System.out.println(path);
}
@Test
public void testCreate3() throws Exception {
//3. 設定節點的型別
//預設型別:持久化
String path = client.create().withMode(CreateMode.EPHEMERAL).forPath("/app3");
System.out.println(path);
}
@Test
public void testCreate4() throws Exception {
//4. 建立多級節點 /app1/p1
//creatingParentsIfNeeded():如果父節點不存在,則建立父節點
String path = client.create().creatingParentsIfNeeded().forPath("/app4/p1");
System.out.println(path);
}
4 ZookeeperJavaAPI操作-查詢節點
/**
* 查詢節點:
* 1. 查詢資料:get: getData().forPath()
* 2. 查詢子節點: ls: getChildren().forPath()
* 3. 查詢節點狀態資訊:ls -s:getData().storingStatIn(狀態物件).forPath()
*/
@Test
public void testGet1() throws Exception {
//1. 查詢資料:get
byte[] data = client.getData().forPath("/app1");
System.out.println(new String(data));
}
@Test
public void testGet2() throws Exception {
// 2. 查詢子節點: ls
List<String> path = client.getChildren().forPath("/");
System.out.println(path);
}
@Test
public void testGet3() throws Exception {
Stat status = new Stat();
System.out.println(status);
//3. 查詢節點狀態資訊:ls -s
client.getData().storingStatIn(status).forPath("/app1");
System.out.println(status);
}
5 Zookeeper JavaAPI操作-修改節點
/**
* 修改資料
* 1. 基本修改資料:setData().forPath()
* 2. 根據版本修改: setData().withVersion().forPath()
* * version 是通過查詢出來的。目的就是為了讓其他客戶端或者執行緒不干擾我。
*
* @throws Exception
*/
@Test
public void testSet() throws Exception {
client.setData().forPath("/app1", "itcast".getBytes());
}
@Test
public void testSetForVersion() throws Exception {
Stat status = new Stat();
//3. 查詢節點狀態資訊:ls -s
client.getData().storingStatIn(status).forPath("/app1");
int version = status.getVersion();//查詢出來的 3
System.out.println(version);
client.setData().withVersion(version).forPath("/app1", "hehe".getBytes());
}
6 Zookeeper JavaAPI操作-刪除節點
/**
* 刪除節點: delete deleteall
* 1. 刪除單個節點:delete().forPath("/app1");
* 2. 刪除帶有子節點的節點:delete().deletingChildrenIfNeeded().forPath("/app1");
* 3. 必須成功的刪除:為了防止網路抖動。本質就是重試。 client.delete().guaranteed().forPath("/app2");
* 4. 回撥:inBackground
* @throws Exception
*/
@Test
public void testDelete() throws Exception {
// 1. 刪除單個節點
client.delete().forPath("/app1");
}
@Test
public void testDelete2() throws Exception {
//2. 刪除帶有子節點的節點
client.delete().deletingChildrenIfNeeded().forPath("/app4");
}
@Test
public void testDelete3() throws Exception {
//3. 必須成功的刪除
client.delete().guaranteed().forPath("/app2");
}
@Test
public void testDelete4() throws Exception {
//4. 回撥
client.delete().guaranteed().inBackground(new BackgroundCallback(){
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
System.out.println("我被刪除了~");
System.out.println(event);
}
}).forPath("/app1");
}
7 Zookeeper JavaAPI操作-Watch監聽概述
•ZooKeeper 允許使用者在指定節點上註冊一些Watcher,並且在一些特定事件觸發的時候,ZooKeeper 服務端會將事件通知到感興趣的客戶端上去,該機制是 ZooKeeper 實現分散式協調服務的重要特性。
•ZooKeeper 中引入了Watcher機制來實現了釋出/訂閱功能能,能夠讓多個訂閱者同時監聽某一個物件,當一個物件自身狀態變化時,會通知所有訂閱者。
•ZooKeeper 原生支援通過註冊Watcher來進行事件監聽,但是其使用並不是特別方便
需要開發人員自己反覆註冊Watcher,比較繁瑣。
•Curator引入了 Cache 來實現對 ZooKeeper 服務端事件的監聽。
•ZooKeeper提供了三種Watcher:
•NodeCache : 只是監聽某一個特定的節點
•PathChildrenCache : 監控一個ZNode的子節點.
•TreeCache : 可以監控整個樹上的所有節點,類似於PathChildrenCache和NodeCache的組合
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片儲存下來直接上傳(img-6DJRtgzk-1607995210038)(assets\1592057429708.png)]
8 Zookeeper JavaAPI操作-Watch監聽-NodeCache
/**
* 演示 NodeCache:給指定一個節點註冊監聽器
*/
@Test
public void testNodeCache() throws Exception {
//1. 建立NodeCache物件
final NodeCache nodeCache = new NodeCache(client,"/app1");
//2. 註冊監聽
nodeCache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
System.out.println("節點變化了~");
//獲取修改節點後的資料
byte[] data = nodeCache.getCurrentData().getData();
System.out.println(new String(data));
}
});
//3. 開啟監聽.如果設定為true,則開啟監聽是,載入緩衝資料
nodeCache.start(true);
while (true){
}
}
9 Zookeeper JavaAPI操作-Watch監聽-PathChildrenCache
@Test
public void testPathChildrenCache() throws Exception {
//1.建立監聽物件
PathChildrenCache pathChildrenCache = new PathChildrenCache(client,"/app2",true);
//2. 繫結監聽器
pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() { @Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
System.out.println("子節點變化了~");
System.out.println(event);
//監聽子節點的資料變更,並且拿到變更後的資料
//1.獲取型別
PathChildrenCacheEvent.Type type = event.getType();
//2.判斷型別是否是update
if(type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){
System.out.println("資料變了!!!");
byte[] data = event.getData().getData();
System.out.println(new String(data));
}
}
});
//3. 開啟
pathChildrenCache.start();
while (true){
}
}
10 Zookeeper JavaAPI操作-Watch監聽-TreeCache
/**
* 演示 TreeCache:監聽某個節點自己和所有子節點們
*/
@Test
public void testTreeCache() throws Exception {
//1. 建立監聽器
TreeCache treeCache = new TreeCache(client,"/app2");
//2. 註冊監聽
treeCache.getListenable().addListener(new TreeCacheListener() {
@Override
public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
System.out.println("節點變化了");
System.out.println(event);
}
});
//3. 開啟
treeCache.start();
while (true){
}
}
11 Zookeeper分散式鎖-概念
•在我們進行單機應用開發,涉及併發同步的時候,我們往往採用synchronized或者Lock的方式來解決多執行緒間的程式碼同步問題,這時多執行緒的執行都是在同一個JVM之下,沒有任何問題。
•但當我們的應用是分散式叢集工作的情況下,屬於多JVM下的工作環境,跨JVM之間已經無法通過多執行緒的鎖解決同步問題。
•那麼就需要一種更加高階的鎖機制,來處理種跨機器的程序之間的資料同步問題——這就是分散式鎖。
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片儲存下來直接上傳(img-t0GDJm8G-1607995210039)(assets\1592057871141.png)]
12 Zookeeper 分散式鎖-zookeeper分散式鎖原理
•核心思想:當客戶端要獲取鎖,則建立節點,使用完鎖,則刪除該節點。
1.客戶端獲取鎖時,在lock節點下建立臨時順序節點。
2.然後獲取lock下面的所有子節點,客戶端獲取到所有的子節點之後,如果發現自己建立的子節點序號最小,那麼就認為該客戶端獲取到了鎖。使用完鎖後,將該節點刪除。
3.如果發現自己建立的節點並非lock所有子節點中最小的,說明自己還沒有獲取到鎖,此時客戶端需要找到比自己小的那個節點,同時對其註冊事件監聽器,監聽刪除事件。
4.如果發現比自己小的那個節點被刪除,則客戶端的
Watcher會收到相應通知,此時再次判斷自己建立的節點
是否是lock子節點中序號最小的,如果是則獲取到了鎖,
如果不是則重複以上步驟繼續獲取到比自己小的一個節點
並註冊監聽。
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片儲存下來直接上傳(img-UrdQpK59-1607995210042)(assets\1592057925831.png)]
13 Zookeeper 分散式鎖-模擬12306售票案例
Curator實現分散式鎖API
-
在Curator中有五種鎖方案:
-
InterProcessSemaphoreMutex:分散式排它鎖(非可重入鎖)
-
InterProcessMutex:分散式可重入排它鎖
-
InterProcessReadWriteLock:分散式讀寫鎖
-
InterProcessMultiLock:將多個鎖作為單個實體管理的容器
-
InterProcessSemaphoreV2:共享訊號量
-
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片儲存下來直接上傳(img-7q9L9P1v-1607995210044)(assets\1592058017457.png)]
1,建立執行緒進行加鎖設定
public class Ticket12306 implements Runnable{
private int tickets = 10;//資料庫的票數
private InterProcessMutex lock ;
@Override
public void run() {
while(true){
//獲取鎖
try {
lock.acquire(3, TimeUnit.SECONDS);
if(tickets > 0){
System.out.println(Thread.currentThread()+":"+tickets);
Thread.sleep(100);
tickets--;
}
} catch (Exception e) {
e.printStackTrace();
}finally {
//釋放鎖
try {
lock.release();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
2,建立連線,並且初始化鎖
public Ticket12306(){
//重試策略
RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
//2.第二種方式
//CuratorFrameworkFactory.builder();
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("192.168.149.135:2181")
.sessionTimeoutMs(60 * 1000)
.connectionTimeoutMs(15 * 1000)
.retryPolicy(retryPolicy)
.build();
//開啟連線
client.start();
lock = new InterProcessMutex(client,"/lock");
}
3,執行多個執行緒進行測試
public class LockTest {
public static void main(String[] args) {
Ticket12306 ticket12306 = new Ticket12306();
//建立客戶端
Thread t1 = new Thread(ticket12306,"攜程");
Thread t2 = new Thread(ticket12306,"飛豬");
t1.start();
t2.start();
}
}