ZooKeeper 分散式過程協同技術詳解 —— 讀書筆記
阿新 • • 發佈:2019-02-19
一、Master.java
package com.dayw.zk; import java.io.IOException; import java.util.Random; import org.apache.zookeeper.AsyncCallback.DataCallback; import org.apache.zookeeper.AsyncCallback.StringCallback; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.Code; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class Master implements Watcher { static final Logger LOG = LoggerFactory.getLogger(Master.class); ZooKeeper zk; static String hostPort; public Master(String host) { super(); this.hostPort = host; } void startZK(String host) { try { zk = new ZooKeeper(host, 15000, this); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } void stopZK() { try { zk.close(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } public void process(WatchedEvent e) { System.out.println(e); } Random ran = new Random(); String serverId = Integer.toHexString(ran.nextInt()); static boolean isLeader = false; StringCallback masterCreateCallback = new StringCallback() { @Override public void processResult(int rc, String path, Object ctx, String name) { switch (Code.get(rc)) { case CONNECTIONLOSS: checkMaster(); return; case OK: isLeader = true; break; default: isLeader = false; break; } System.out.println("I'm " + (isLeader ? "" : "not ") + "the leader"); } }; DataCallback masterCheckCallback = new DataCallback() { @Override public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { switch (Code.get(rc)) { case CONNECTIONLOSS: checkMaster(); return; case NONODE: try { runForMaster(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } return; default: break; } } }; void checkMaster() { zk.getData("/master", false, masterCheckCallback, null); /** * 同步 * while (true) { Stat stat = new Stat(); try { byte data[] = zk.getData("/master", false, stat); isLeader = new String(data).equals(serverId); return true; } catch (KeeperException e) { return false; } catch (InterruptedException e) { } } */ } void runForMaster() throws InterruptedException { /** 非同步 */ zk.create("/master", serverId.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, masterCreateCallback, null); /** 同步 * while (true) { try { zk.create("/master", serverId.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); isLeader = true; break; } catch (NodeExistsException e) { isLeader = false; break; } catch (KeeperException e) { } if (checkMaster()) break; } */ } public void bootstrap() { createParent("/workers", new byte[0]); createParent("/assign", new byte[0]); createParent("/tasks", new byte[0]); createParent("/status", new byte[0]); } StringCallback createParentCallback = new StringCallback() { @Override public void processResult(int rc, String path, Object ctx, String name) { switch (Code.get(rc)) { case CONNECTIONLOSS: createParent(path, (byte[]) ctx); break; case OK: LOG.info("Parent created"); break; case NODEEXISTS: LOG.warn("Parent already registerd: " + path); break; default: LOG.error("Something went wrong: " + KeeperException.create(Code.get(rc), path)); break; } } }; void createParent(String path, byte[] data) { zk.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, createParentCallback, data); } public static void main(String[] args) throws Exception { Master z = new Master("172.16.16.176,2181"); z.startZK(hostPort); z.runForMaster(); if (isLeader) { System.out.println("I'm the leader"); Thread.sleep(60000); } else { System.out.println("Someone else is the leader"); } z.stopZK(); } }
二、Worker.java
package com.dayw.zk; import java.io.IOException; import java.util.Random; import org.apache.zookeeper.AsyncCallback.StatCallback; import org.apache.zookeeper.AsyncCallback.StringCallback; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.Code; import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class Worker implements Watcher { private static final Logger LOG = LoggerFactory.getLogger(Worker.class); ZooKeeper zk; String hostPort; String status; String serverId = Integer.toHexString(new Random().nextInt()); public Worker(String hostPort) { this.hostPort = hostPort; } void startZK() throws IOException { zk = new ZooKeeper(hostPort, 1500, this); } @Override public void process(WatchedEvent event) { LOG.info(event.toString() + ", " + hostPort); } StringCallback createWorkerCallback = new StringCallback() { @Override public void processResult(int rc, String path, Object ctx, String name) { switch (Code.get(rc)) { case CONNECTIONLOSS: register(); break; case OK: LOG.info("Registered successfully:" + serverId); break; case NODEEXISTS: LOG.warn("Already registered:" + serverId); break; default: LOG.error("Something went wrong:" + KeeperException.create(Code.get(rc), path)); break; } } }; void register() { zk.create("/workers/worker-" + serverId, "Idle".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, createWorkerCallback, null); } /** setStatus */ StatCallback statusUpdateCallback = new StatCallback() { @Override public void processResult(int rc, String path, Object ctx, Stat stat) { switch (Code.get(rc)) { case CONNECTIONLOSS: updateStatus("test" , (String)ctx); return; default: break; } } }; synchronized private void updateStatus(String name, String status) { if (status == this.status) { zk.setData("/workers" + name, status.getBytes(), -1, statusUpdateCallback, status); } } public void setStatus(String status) { this.status = status; updateStatus("test", status); } public static void main(String[] args) throws Exception { Worker w = new Worker(args[0]); w.startZK(); w.register(); Thread.sleep(30000); } }
三、Client.java
package com.dayw.zk; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.ConnectionLossException; import org.apache.zookeeper.KeeperException.NodeExistsException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class Client implements Watcher { private static final Logger LOG = LoggerFactory.getLogger(Client.class); ZooKeeper zk; String hostPort; Client(String hostPort) { this.hostPort = hostPort; } void startZK() throws Exception { zk = new ZooKeeper(hostPort, 15000, this); } String queueCommand(String command) throws KeeperException { String name = null; while (true) { try { name = zk.create("/tasks/task-", command.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL); return name; } catch (NodeExistsException e) { LOG.error(name + " already appears to be running"); } catch (ConnectionLossException e) { } catch (InterruptedException e) { // TODO: handle exception } } } @Override public void process(WatchedEvent event) { // TODO Auto-generated method stub } }
四、AdminClient.java
package com.dayw.zk;
import java.util.Date;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class AdminClient implements Watcher {
private static final Logger LOG = LoggerFactory.getLogger(AdminClient.class);
ZooKeeper zk;
String hostPort;
AdminClient(String hostPort) {
this.hostPort = hostPort;
}
void start() throws Exception {
zk = new ZooKeeper(hostPort, 15000, this);
}
void listState() throws KeeperException, InterruptedException {
/** master */
try {
Stat stat = new Stat();
byte masterData[] = zk.getData("/master", false, stat);
Date startDate = new Date(stat.getCtime());
System.out.println("Master: " + new String(masterData) + " since " + startDate);
} catch (NoNodeException e) {
System.out.println("No Master");
}
/** Workers */
System.out.println("Workers:");
for (String w : zk.getChildren("/workers", false)) {
byte data[] = zk.getData("/workers/" + w, false, null);
String state = new String(data);
System.out.println("\t" + w + ": " + state);
}
/** Tasks */
System.out.println("Tasks:");
for (String t : zk.getChildren("/assign", false)) {
System.out.println("\t" + t);
}
}
@Override
public void process(WatchedEvent event) {
System.out.println(event);
}
public static void main(String args[]) throws Exception {
AdminClient c = new AdminClient(args[0]);
c.start();
c.listState();
}
}