2018-07-15期 ZK編程案例-分布式鎖【本人親自反復驗證通過分享】
1、編寫服務端Socket監聽程序,運行與某臺服務器上作為所有客戶端競爭資源
2、客戶端啟動後,都會自動向ZK註冊自己的身份信息,並將自己的身份ID根據統一的生成規則臨時寫入ZK
3、客戶端實時監聽ZK中自身註冊到ZK集群中身份ID變化,若發現自身ID為ZK集群最小的身份ID,則獲得鎖,然後向服務端Socket建立連接發送消息,其它客戶端處於監聽等待狀態
4、自己處理完自己業務後,即不發消息後,自己將自己舊的身份ID從ZK集群刪除,刪除成功後,重新註冊新的身份ID,目的是釋放鎖,讓其它客戶端獲得鎖。
5、若自己異常退出,則ZK集群會將該客戶端身份信息清除,防止客戶端身份ID不變一直獲得鎖,導致其它客戶端無法獲得鎖。
二、實現代碼
1、服務端監聽程序
package cn.itcast.zk.lock;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.ServerSocket;
import java.net.Socket;
/**
* 模擬客戶端競爭的服務資源,這裏開啟一個Socket監聽程序,並實時接收客戶端發送的消息。
* @author songjq
*
*/
public class TcpServerSocket {
public static void getConn() throws IOException {
ServerSocket server = new ServerSocket(9091);
try {
System.out.println("服務端已經在9091端口監聽......");
Socket client = server.accept();
try {
BufferedReader input =
new BufferedReader(new InputStreamReader(client.getInputStream()));
boolean flag = true;
while (flag) {
String line = input.readLine();
if (line.equals("exit")) {
flag = false;
} else {
System.out.println("客戶端說:" + line);
}
}
} finally {
client.close();
server.close();
TcpServerSocket.getConn();
}
} finally {
//server.close();
}
}
public static void main(String[] args) throws Exception {
TcpServerSocket.getConn();
}
}
2、客戶端訪問程序
package cn.itcast.zk.lock;
import java.io.IOException;
import java.io.PrintWriter;
import java.net.Socket;
import java.net.UnknownHostException;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.Watcher.Event.EventType;
/**
* zk實現分布式鎖
* 實現原理:
* 1、編寫服務端Socket監聽程序,運行與某臺服務器上作為所有客戶端競爭資源
* 2、客戶端啟動後,都會自動向ZK註冊自己的身份信息,並將自己的身份ID根據統一的生成規則臨時寫入ZK
* 3、客戶端實時監聽ZK中自身註冊到ZK集群中身份ID變化,若發現自身ID為ZK集群最小的身份ID,則獲得鎖,然後向服務端Socket建立連接發送消息,其它客戶端處於監聽等待狀態
* 4、自己處理完自己業務後,即不發消息後,自己將自己舊的身份ID從ZK集群刪除,刪除成功後,重新註冊新的身份ID,目的是釋放鎖,讓其它客戶端獲得鎖。
* 5、若自己異常退出,則ZK集群會將該客戶端身份信息清除,防止客戶端身份ID不變一直獲得鎖,導致其它客戶端無法獲得鎖。
* @author songjq
*/
public class ZKDistributeLockService {
private final static String rootNode = "/hosts";
private static String myZnodePath = "";
private static ZooKeeper zkCli = null;
private static boolean havelock = false;
private static String hostname = "";
/**
* 獲取zk連接
*
* @throws IOException
*/
public static ZooKeeper getZkconnection() throws IOException {
return new ZooKeeper("192.168.1.201:2181,192.168.1.202:2181,192.168.1.203:2181,", 2000, getWather());
}
/**
* 監聽器處理
*/
public static Watcher getWather() {
return new Watcher() {
@Override
public void process(WatchedEvent event) {
//如果沒有子節點變更,則終止下面程序執行
if(event.getType() != EventType.NodeChildrenChanged) return ;
// 獲取鎖
try {
havelock = gainLock();
if (havelock) {
System.out.println(new Date() + ":" + hostname + " get lock....");
// 處理業務
doSomethings(hostname);
// 處理完業務後刪除鎖,並重新註冊znode節點鎖
deleteLock();
// 重新註冊znode節點鎖
registerZnodeLock();
}
} catch (Exception e) {
// TODO: handle exception
e.printStackTrace();
}
}
};
}
/**
* 註冊鎖節點
*
* @throws KeeperException
* @throws InterruptedException
*/
public static void registerZnodeLock() throws KeeperException, InterruptedException {
if (zkCli.exists("/hosts", null) == null) {
zkCli.create("/hosts", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
myZnodePath = zkCli.create(rootNode + "/lock", null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
// 註冊完鎖節點後睡眠500-1000ms後在獲取鎖
Thread.sleep((long) (Math.random() * 500 + 500));
}
/**
* 獲取鎖,判斷當前註冊myZnodePath是否和zk集群中最小的節點一致,若一致就獲得鎖
*
* @return
* @throws KeeperException
* @throws InterruptedException
*/
public static boolean gainLock() throws KeeperException, InterruptedException {
List<String> children = zkCli.getChildren(rootNode, true);
// 若當前集群內znode節點數為1,說明是自己剛註冊的節點,可以直接獲得鎖
/*if (children.size() == 1) {
return true;
}*/
if (children.size() > 0) {
// 對znode節點進行排序
Collections.sort(children);
String tmpZnode = children.get(0);
System.out.println("tmpZnode:"+tmpZnode+",myZnodePath:"+myZnodePath);
if (tmpZnode.equals(myZnodePath.substring(rootNode.length()+1))) {
return true;
}else {
return false;
}
}else {
return false;
}
}
/**
* 模擬處理業務
* 這裏即為獲得鎖後向同服務端建立socket連接,並向服務端發送消息。
* @param hostname
* @throws InterruptedException
* @throws IOException
* @throws UnknownHostException
*/
public static void doSomethings(String hostname) throws InterruptedException, UnknownHostException, IOException {
System.out.println("------------------>" + hostname + " Begin connect to Server and send msg to Server...");
// 模擬業務處理2-3秒
Thread.sleep((long) (Math.random() * 5000 + 1000));
/**
* 在這裏實現發送消息代碼
*/
Socket client = new Socket("127.0.0.1", 9091);
try {
PrintWriter output = new PrintWriter(client.getOutputStream(), true);
/*
* 向服務端發送消息
*/
String words = "Client MSG-> This is from " + hostname +" message.";
output.println(words);
} finally {
client.close();
}
System.out.println("------------------>" + hostname + " Msg Send completed!!!");
}
/**
* 刪除myZnodePath節點
*
* @throws InterruptedException
* @throws KeeperException
*/
public static void deleteLock() throws InterruptedException, KeeperException {
zkCli.delete(myZnodePath, -1);
}
/**
* 主類調用
* @param args
* @throws IOException
* @throws KeeperException
* @throws InterruptedException
*/
public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
hostname = args[0];
// 獲取zk連接
zkCli = ZKDistributeLockService.getZkconnection();
// 註冊znode節點鎖
ZKDistributeLockService.registerZnodeLock();
// 獲取鎖
havelock = ZKDistributeLockService.gainLock();
if (havelock) {
System.out.println(new Date() + ":" + hostname + " get lock....");
// 處理業務
ZKDistributeLockService.doSomethings(hostname);
// 處理完業務後刪除鎖,並重新註冊znode節點鎖
ZKDistributeLockService.deleteLock();
// 重新註冊znode節點鎖
ZKDistributeLockService.registerZnodeLock();
}
//主程序睡眠
Thread.sleep(Long.MAX_VALUE);
}
}
2018-07-15期 ZK編程案例-分布式鎖【本人親自反復驗證通過分享】