2018-07-14期 ZK編程案例-分布式協調【本人親自反復驗證通過分享】
利用ZK監聽器實現分布式協調服務,即實現服務端服務健康狀態的實時監測。
1、編寫一個服務端程序,實現原理:
(1)服務端程序啟動後,開啟Socket監聽
(2)開啟Socket監聽後,將自己監聽Socket身份信息臨時寫入Zookeeper集群
(3)服務關閉後,Zookeeper集群自動將該服務身份信息從ZK集群清除
實現代碼
package cn.itcast.zk.distributeserver;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Enumeration;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
public class DistributeServer {
/**
* 連接zk,服務啟動後往zk註冊服務器信息,並啟動監聽端口9091
*
* @throws Exception
*/
public void connectionZk(String zNodeVal) throws Exception {
/*
* 這裏連接ZK,無需註冊監聽器,所有監聽器watcher為null
*/
ZooKeeper zkCli = new ZooKeeper("192.168.1.201:2181,192.168.1.202:2181,192.168.1.203:2181", 2000, null);
if (zkCli.exists("/server", null) == null) {
zkCli.create("/server", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
/*
* 服務啟動後,註冊自己的身份信息到ZK,註冊方式采用臨時註冊EPHEMERAL_SEQUENTIAL,同時生成的Znode為在/server/host+
* 自增序列號SEQUENTIAL 如/server/host00000000000001
*/
zkCli.create("/server/host", zNodeVal.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
}
/**
* 主線程 模擬服務端socket監聽,並註冊ZK集群
*
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
DistributeServer distributeServer = new DistributeServer();
InetAddress localAddress = new DistributeServer().getLocalHostLANAddress();
String zNodeVal = localAddress.getHostName() + ":" + localAddress.getHostAddress() + ":9091";
// 服務啟動後,將服務信息註冊到zk
distributeServer.connectionZk(zNodeVal);
// 將服務端口監聽起來,實時準備處理客戶端發送過來消息
distributeServer.handleBusiness(localAddress.getHostAddress());
}
/**
* 模擬處理業務 這裏為開啟Socket監聽,並接收客戶端發送過來的消息。
*
* @param ipaddress
* @throws IOException
*/
public static void handleBusiness(String ipaddress) throws IOException {
ServerSocket server = new ServerSocket(9091);
try {
System.out.println("Server " + ipaddress + " has listener on 9091......");
Socket client = server.accept();
try {
BufferedReader input = new BufferedReader(new InputStreamReader(client.getInputStream()));
boolean flag = true;
while (flag) {
String line = input.readLine();
if (line.equals("exit")) {
flag = false;
System.out.println("Client exit!");
} else {
System.out.println("Client Msg:" + line);
}
}
} finally {
client.close();
server.close();
/**
* 防止客戶端連接斷開後,服務端端監聽異常,因此這裏沒每次處理完客戶端消息後,都會重新建立監聽
*/
DistributeServer.handleBusiness(ipaddress);
}
} finally {
server.close();
}
}
/**
* 自動獲取操作系統IP地址,目的是讓服務端socket在該地址上建立監聽。
*
* @return
* @throws Exception
*/
public InetAddress getLocalHostLANAddress() throws Exception {
try {
InetAddress candidateAddress = null;
// 遍歷所有的網絡接口
for (Enumeration<NetworkInterface> ifaces = NetworkInterface.getNetworkInterfaces(); ifaces
.hasMoreElements();) {
NetworkInterface iface = (NetworkInterface) ifaces.nextElement();
// 在所有的接口下再遍歷IP
for (Enumeration inetAddrs = iface.getInetAddresses(); inetAddrs.hasMoreElements();) {
InetAddress inetAddr = (InetAddress) inetAddrs.nextElement();
if (!inetAddr.isLoopbackAddress()) {// 排除loopback類型地址
if (inetAddr.isSiteLocalAddress()) {
// 如果是site-local地址,就是它了
return inetAddr;
} else if (candidateAddress == null) {
// site-local類型的地址未被發現,先記錄候選地址
candidateAddress = inetAddr;
}
}
}
}
if (candidateAddress != null) {
return candidateAddress;
}
// 如果沒有發現 non-loopback地址.只能用最次選的方案
InetAddress jdkSuppliedAddress = InetAddress.getLocalHost();
return jdkSuppliedAddress;
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
}
2、客戶端程序,實現原理:
利用ZK監聽器實現分布式協調服務,即實現服務端服務健康狀態的實時監測。
(1)、服務端服務啟動後,會立即向ZK註冊自身服務標識信息
(2)、若服務端服務掉線,則會ZK會自動將該服務標識信息從ZK集群清除
(3)、客戶端實時監聽ZK集群中服務在線情況,若服務端服務掉線,會被客戶端監聽器立即監聽到
(4)、客戶端監聽到服務端服務掉線後,相關後續業務不會再提交給掉線服務處理
實現代碼:
package cn.itcast.zk.distributeserver;
import java.io.IOException;
import java.io.PrintWriter;
import java.net.Socket;
import java.util.ArrayList;
import java.util.List;
import java.util.Scanner;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper.States;
/**
* 利用ZK監聽器實現分布式協調服務,即實現服務端服務健康狀態的實時監測。
* 實現原理:
* 1、服務端服務啟動後,會立即向ZK註冊自身服務標識信息
* 2、若服務端服務掉線,則會ZK會自動將該服務標識信息從ZK集群清除
* 3、客戶端實時監聽ZK集群中服務在線情況,若服務端服務掉線,會被客戶端監聽器立即監聽到
* 4、客戶端監聽到服務端服務掉線後,先關後續業務不會再提交給掉線服務處理
* 5、本代碼具體主要實現以下功能:
* (1)服務端程序啟動後,會立即啟動一個Socket監聽,並將Socket連接信息註冊到ZK集群,註冊內容為IP:PORT套接字
* (2)客戶端通過ZK一直監聽服務端在線情況
* (3)客戶端定時向各個服務端發送Socket消息,默認情況下如果所有服務端服務均正常,則所有服務端都會收到客戶端發送的Socket消息
* 若服務端服務異常,則客戶端會檢測到,將不會向掉線的服務發送socket消息。這樣就保障了客戶端發送的消息不會丟失,總有活動的
* 服務端能接收到。
*
* @author songjq
*
*/
public class DistributeClient {
// 為了線程安全,需要加volatile修飾符
volatile private static ArrayList<String> hlist = new ArrayList<String>();
private static ZooKeeper zk1_;
public static void waitUntilConnected(ZooKeeper zooKeeper, CountDownLatch connectedLatch) {
if (States.CONNECTING == zooKeeper.getState()) {
try {
connectedLatch.await();
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
}
}
static class ConnectedWatcher implements Watcher {
private CountDownLatch connectedLatch;
ConnectedWatcher(CountDownLatch connectedLatch) {
this.connectedLatch = connectedLatch;
}
/**
* 監聽器回調方法 如果需要對某個znode進行持續監聽,需要重新在回調方法中註冊監聽器
*/
@Override
public void process(WatchedEvent event) {
System.out.println("節點:" + event.getPath() + " 發生了事件:" + event.getType());
if (event.getState() == KeeperState.SyncConnected) {
connectedLatch.countDown();
}
try {
/**
* 這裏調用獲取葉子加點的方法,目的是在該監聽器中重新註冊監聽,防止監聽失效。
*/
getServerHostList(zk1_);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* 獲取zk連接
* @param hostports
* @param times
* @return
* @throws Exception
*/
public ZooKeeper getConnection(String hostports, int times) throws Exception {
ZooKeeper zktmp = new ZooKeeper(hostports, 1000, new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("節點:" + event.getPath() + " 發生了事件:" + event.getType());
try {
getServerHostList(zk1_);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
return zktmp;
}
/**
* 獲取類的實例
*/
static private DistributeClient static_;
static public DistributeClient Instance() {
if (static_ == null) {
static_ = new DistributeClient();
}
return static_;
}
public static void main(String[] args) throws Exception {
String hostports = "192.168.1.201:2181,192.168.1.202:2181,192.168.1.203:2181";
DistributeClient instance = DistributeClient.Instance();
zk1_ = instance.getConnection(hostports, 2000);
getServerHostList(zk1_);
/*
* 模擬客戶端永不間斷的定時向活動的服務端發送socket消息
*/
while (true) {
getServerHostList(zk1_);
for (String host : hlist) {
String[] hostAry = host.split(":");
String hostname = hostAry[0];
String ip = hostAry[1];
int port = Integer.parseInt(hostAry[2]);
new DistributeClient().handleBusiness(ip, port, hostname);
System.out.println(host);
}
Thread.sleep(20000);
System.out.println("---------------");
}
}
/**
* 獲取在線服務節點
* @param zkCli
* @throws KeeperException
* @throws InterruptedException
*/
public static void getServerHostList(ZooKeeper zkCli) throws KeeperException, InterruptedException {
hlist.clear();
/*
* 獲取/server/葉子節點,並同時在該葉子節點註冊監聽器
*/
List<String> hosts = zkCli.getChildren("/server", true);
for (String host : hosts) {
byte[] data = zkCli.getData("/server/" + host, null, null);
hlist.add(new String(data));
}
}
/**
* 模擬處理業務
* 向活動的服務端發送Socket消息
* @throws IOException
*/
public void handleBusiness(String IP, int port, String hostname) throws IOException {
Socket client = new Socket(IP, port);
try {
PrintWriter output = new PrintWriter(client.getOutputStream(), true);
/*
* 向服務端發送消息
*/
String words = "Client MSG->" + hostname + "," + IP;
output.println(words);
} finally {
client.close();
}
}
}
2018-07-14期 ZK編程案例-分布式協調【本人親自反復驗證通過分享】