ZooKeeper配置及簡單使用
阿新 • • 發佈:2018-11-11
安裝並使用ZooKeeper API對Znode進行控制。
Zookeeper配置
下載zookeeper
首先在官網下載zookeeper:
wget http://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.4.13/zookeeper-3.4.13.tar.gz
解壓:
sudo tar xzvf zookeeper-3.4.13.tar.gz -C /usr/local
設定許可權:
chown -R hadoop:hadoop zookeeper
配置環境變數
export PATH= $PATH:/usr/local/zookeeper/bin
修改配置檔案
重新命名:
sudo mv zookeeper-3.4.13/ zookeeper
mv zoo_sample.cfg zoo.cfg
修改zoo.cfg
:
可以改conf檔案,建立多個埠-> 多個server:
啟動zookeeper
zkServer.sh start
驗證是否啟動
telnet localhost 2181
輸入start
,若看到Zookeeper version則說明啟動成功
輸入jps
看是否啟動:
關閉zookeeper
zkServer.sh stop
Zookeeper命令列
參考這裡。
進入命令列工具:
zkCli.sh -server
使用 ls 命令來檢視當前 ZooKeeper 中所包含的內容:
下面我們通過 set 命令來對 zk 所關聯的字串進行設定:
下面我們將剛才建立的 znode 刪除:
delete /zk
刪除節點:
rmr /zk
編寫Client程式
建立maven專案
修改pox.xml檔案:
- 參考官方配置
修改為:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>Dase</groupId>
<artifactId>1</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.13</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
下載JUnitGenerator V2.0
在IDEA
中,JUnit已經預設下載好,我們需要新增JUnitGenerator V2.0外掛:
編寫程式
import java.util.List;
import org.apache.zookeeper.*;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;
public class Simple {
private static final String connectString = "localhost:2181";
private static final int sessionTimeout = 2000;
private ZooKeeper zkClient = null;
public void init() throws Exception {
zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent event) {
// 收到事件通知後的回撥函式(應該是我們自己的事件處理邏輯)
System.out.println(event.getType() + "---" + event.getPath());
try {
zkClient.getChildren("/", true);
} catch (Exception e) {
}
}
});
}
/**
* 資料的增刪改查
*
* @throws InterruptedException
* @throws KeeperException
*/
// 建立資料節點到zk中
public void Create() throws KeeperException, InterruptedException {
// 引數1:要建立的節點的路徑 引數2:節點大資料 引數3:節點的許可權 引數4:節點的型別
String nodeCreated = zkClient.create("/scott", "hellozk".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
//上傳的資料可以是任何型別,但都要轉成byte[]
}
//判斷znode是否存在
public void Exist() throws Exception{
Stat stat = zkClient.exists("/scott", false);
System.out.println(stat==null?"not exist":"exist");
}
// 獲取子節點
public void getChildren() throws Exception {
List<String> children = zkClient.getChildren("/", true);
for (String child : children) {
System.out.println(child);
}
Thread.sleep(Long.MAX_VALUE);
}
//獲取znode的資料
public void getData() throws Exception {
byte[] data = zkClient.getData("/scott", false, null);
System.out.println(new String(data));
}
//刪除znode
public void deleteZnode() throws Exception {
//引數2:指定要刪除的版本,-1表示刪除所有版本
zkClient.delete("/eclipse", -1);
}
//刪除znode
public void setData() throws Exception {
zkClient.setData("/scott", "imissyou angelababy".getBytes(), -1);
byte[] data = zkClient.getData("/scott", false, null);
System.out.println(new String(data));
}
}
Junit
使用Junit進行單元測試,這裡首先需要建立測試類:
使用快捷鍵alt+insert
:
會自動在test
中建立同名檔案:
修改該檔案:
package test;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.junit.Test;
import org.junit.Before;
import org.junit.After;
import java.util.List;
/**
* Simple Tester.
*
* @author <Authors name>
* @since <pre>Nov 11, 2018</pre>
* @version 1.0
*/
public class SimpleTest {
private static final String connectString = "localhost:2181";
private static final int sessionTimeout = 2000;
private ZooKeeper zkClient = null;
@Before
public void testInit() throws Exception {
zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent event) {
// 收到事件通知後的回撥函式(應該是我們自己的事件處理邏輯)
System.out.println(event.getType() + "---" + event.getPath());
try {
zkClient.getChildren("/", true);
} catch (Exception e) {
}
}
});
String nodeCreated = zkClient.create("/scott", "hellozk".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
/**
*
* Method: Exist()
*
*/
@Test
public void testExist() throws Exception {
Stat stat = zkClient.exists("/scott", false);
System.out.println(stat==null?"not exist":"exist");
}
/**
*
* Method: getChildren()
*
*/
@Test
public void testGetChildren() throws Exception {
List<String> children = zkClient.getChildren("/", true);
for (String child : children) {
System.out.println(child);
}
// Thread.sleep(Long.MAX_VALUE);
}
/**
*
* Method: getData()
*
*/
@Test
public void testGetData() throws Exception {
byte[] data = zkClient.getData("/scott", false, null);
System.out.println(new String(data));
}
/**
*
* Method: deleteZnode()
*
*/
@Test
public void testDeleteZnode() throws Exception {
zkClient.delete("/scott", -1);
}
/**
*
* Method: setData()
*
*/
@Test
public void testSetData() throws Exception {
zkClient.setData("/scott", "imissyou angelababy".getBytes(), -1);
byte[] data = zkClient.getData("/scott", false, null);
System.out.println(new String(data));
}
}
其中,@Before
表示在測試前執行,注意,JUnit不能指定@test
執行順序,如果非要指定,需要對函式名進行重新命名,具體參考這裡
測試
對每個函式進行測試:
如:對Exist
函式進行測試:
直到每個函式通過為止
體會分散式共享鎖的實現
編寫程式
在src/main/java
下建立DistributedClientLock
:
import java.util.Collections;
import java.util.List;
import java.util.Random;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
public class DistributedClientLock {
// 會話超時
private static final int SESSION_TIMEOUT = 2000;
// zookeeper叢集地址
private String hosts = "localhost:2181";
private String groupNode = "locks";
private String subNode = "sub";
private boolean haveLock = false;
private ZooKeeper zk;
// 記錄自己建立的子節點路徑
private volatile String thisPath;
/**
* 連線zookeeper
*/
private void connectZookeeper() throws Exception {
zk = new ZooKeeper(hosts, SESSION_TIMEOUT, new Watcher() {
public void process(WatchedEvent event) {
try {
// 判斷事件型別,此處只處理子節點變化事件
if (event.getType() == EventType.NodeChildrenChanged && event.getPath().equals("/" + groupNode)) {
//獲取子節點,並對父節點進行監聽
List<String> childrenNodes = zk.getChildren("/" + groupNode, true);
String thisNode = thisPath.substring(("/" + groupNode + "/").length());
// 去比較是否自己是最小id
Collections.sort(childrenNodes);
if (childrenNodes.indexOf(thisNode) == 0) {
//訪問共享資源處理業務,並且在處理完成之後刪除鎖
doSomething();
//重新註冊一把新的鎖
thisPath = zk.create("/" + groupNode + "/" + subNode, null, Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
});
// 1、程式一進來就先註冊一把鎖到zk上
thisPath = zk.create("/" + groupNode + "/" + subNode, null, Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
// wait一小會,便於觀察
Thread.sleep(new Random().nextInt(1000));
// 從zk的鎖父目錄下,獲取所有子節點,並且註冊對父節點的監聽
List<String> childrenNodes = zk.getChildren("/" + groupNode, true);
//如果爭搶資源的程式就只有自己,則可以直接去訪問共享資源
if (childrenNodes.size() == 1) {
doSomething();
thisPath = zk.create("/" + groupNode + "/" + subNode, null, Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
}
}
/**
* 處理業務邏輯,並且在最後釋放鎖
*/
private void doSomething() throws Exception {
try {
System.out.println("gain lock: " + thisPath);
Thread.sleep(2000);
// do something
} finally {
System.out.println("finished: " + thisPath);
//釋放鎖
zk.delete(this.thisPath, -1);
}
}
public static void main(String[] args) throws Exception {
DistributedClientLock dl = new DistributedClientLock();
dl.connectZookeeper();
Thread.sleep(Long.MAX_VALUE);
}
}
執行
首先需要在命令列中建立Znode:
create /locks locks
然後執行程式,程式會拿到鎖、釋放鎖不停交替執行: