1. 程式人生 > >ZooKeeper 分散式過程協同技術詳解 —— 讀書筆記

ZooKeeper 分散式過程協同技術詳解 —— 讀書筆記

一、Master.java

package com.dayw.zk;

import java.io.IOException;
import java.util.Random;

import org.apache.zookeeper.AsyncCallback.DataCallback;
import org.apache.zookeeper.AsyncCallback.StringCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Master implements Watcher {
    
    static final Logger LOG = LoggerFactory.getLogger(Master.class);
    
    ZooKeeper zk;
    static String hostPort;

    public Master(String host) {
        super();
        this.hostPort = host;
    }

    void startZK(String host) {
        try {
            zk = new ZooKeeper(host, 15000, this);
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    void stopZK() {
        try {
            zk.close();
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    public void process(WatchedEvent e) {
        System.out.println(e);
    }

    Random ran = new Random();
    String serverId = Integer.toHexString(ran.nextInt());
    static boolean isLeader = false;
    
    StringCallback masterCreateCallback = new StringCallback() {
        
        @Override
        public void processResult(int rc, String path, Object ctx, String name) {
            switch (Code.get(rc)) {
            case CONNECTIONLOSS:
                checkMaster();
                return;
            case OK:
                isLeader = true;
                break;
            default:
                isLeader = false;
                break;
            }            
            System.out.println("I'm " + (isLeader ? "" : "not ") + 
                    "the leader");
        }
    };

    DataCallback masterCheckCallback = new DataCallback() {
        
        @Override
        public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
            switch (Code.get(rc)) {
            case CONNECTIONLOSS:
                checkMaster();
                return;
            case NONODE:
                try {
                    runForMaster();
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
                return;
            default:
                break;
            }
        }
    };
    
    void checkMaster() {
        zk.getData("/master", false, masterCheckCallback, null);
        /** 
         * 同步
         * while (true) {
            
            Stat stat = new Stat();
            try {
                byte data[] = zk.getData("/master", false, stat);
                isLeader = new String(data).equals(serverId);
                return true;
            } catch (KeeperException e) {
                return false;
            } catch (InterruptedException e) {
            }
        } */
    }

    void runForMaster() throws InterruptedException {
        /** 非同步 */
        zk.create("/master", serverId.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, masterCreateCallback, null);
        /** 同步
         * while (true) {
            try {
                zk.create("/master", serverId.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
                isLeader = true;
                break;
            } catch (NodeExistsException e) {
                isLeader = false;
                break;
            }

            catch (KeeperException e) {
            }
            if (checkMaster())
                break;
        } */

    }
    
    public void bootstrap() {
        createParent("/workers", new byte[0]);
        createParent("/assign", new byte[0]);
        createParent("/tasks", new byte[0]);
        createParent("/status", new byte[0]);
    }
    
    
    StringCallback createParentCallback = new StringCallback() {
        
        @Override
        public void processResult(int rc, String path, Object ctx, String name) {
            switch (Code.get(rc)) {
            case CONNECTIONLOSS:
                createParent(path, (byte[]) ctx);
                break;
            case OK:
                LOG.info("Parent created");
                break;
            case NODEEXISTS:
                LOG.warn("Parent already registerd: " + path);
                break;
            default:
                LOG.error("Something went wrong: " + KeeperException.create(Code.get(rc), path));
                break;
            }
        }
    };
    
    void createParent(String path, byte[] data) {
        zk.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, createParentCallback, data);
    }

    public static void main(String[] args) throws Exception {
        Master z = new Master("172.16.16.176,2181");
        z.startZK(hostPort);
        z.runForMaster();
        if (isLeader) {
            System.out.println("I'm the leader");
            Thread.sleep(60000);
        } else {
            System.out.println("Someone else is the leader");
        }
        z.stopZK();
    }

}

二、Worker.java

package com.dayw.zk;

import java.io.IOException;
import java.util.Random;

import org.apache.zookeeper.AsyncCallback.StatCallback;
import org.apache.zookeeper.AsyncCallback.StringCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Worker implements Watcher {

    private static final Logger LOG = LoggerFactory.getLogger(Worker.class);

    ZooKeeper zk;
    String hostPort;
    String status;
    String serverId = Integer.toHexString(new Random().nextInt());

    public Worker(String hostPort) {
        this.hostPort = hostPort;
    }

    void startZK() throws IOException {
        zk = new ZooKeeper(hostPort, 1500, this);
    }

    @Override
    public void process(WatchedEvent event) {
        LOG.info(event.toString() + ", " + hostPort);
    }

    StringCallback createWorkerCallback = new StringCallback() {

        @Override
        public void processResult(int rc, String path, Object ctx, String name) {
            switch (Code.get(rc)) {
            case CONNECTIONLOSS:
                register();
                break;
            case OK:
                LOG.info("Registered successfully:" + serverId);
                break;
            case NODEEXISTS:
                LOG.warn("Already registered:" + serverId);
                break;
            default:
                LOG.error("Something went wrong:" + KeeperException.create(Code.get(rc), path));
                break;
            }
        }
    };

    void register() {
        zk.create("/workers/worker-" + serverId, 
                "Idle".getBytes(), 
                ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, createWorkerCallback, null);
    }
    
    /** setStatus */
    StatCallback statusUpdateCallback = new StatCallback() {
        
        @Override
        public void processResult(int rc, String path, Object ctx, Stat stat) {
            switch (Code.get(rc)) {
            case CONNECTIONLOSS:
                updateStatus("test" , (String)ctx);
                return;
            default:
                break;
            }
        }
    };
    
    synchronized private void updateStatus(String name, String status) {
        if (status == this.status) {
            zk.setData("/workers" + name, status.getBytes(), -1, statusUpdateCallback, status);
        }
    }
    
    public void setStatus(String status) {
        this.status = status;
        updateStatus("test", status);
    }
    
    
    public static void main(String[] args) throws Exception {
        Worker w = new Worker(args[0]);
        w.startZK();
        w.register();
        Thread.sleep(30000);
    }

}

三、Client.java

package com.dayw.zk;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.ConnectionLossException;
import org.apache.zookeeper.KeeperException.NodeExistsException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Client implements Watcher {

    private static final Logger LOG = LoggerFactory.getLogger(Client.class);

    ZooKeeper zk;
    String hostPort;

    Client(String hostPort) {
        this.hostPort = hostPort;
    }

    void startZK() throws Exception {
        zk = new ZooKeeper(hostPort, 15000, this);
    }
    
    String queueCommand(String command) throws KeeperException {
        String name = null;
        while (true) {
            try {
                name = zk.create("/tasks/task-", 
                        command.getBytes(), 
                        ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
                return name;
            } catch (NodeExistsException e) {
                LOG.error(name + " already appears to be running");
            } catch (ConnectionLossException  e) {
        
            } catch (InterruptedException e) {
                // TODO: handle exception
            }
        }
    }

    @Override
    public void process(WatchedEvent event) {
        // TODO Auto-generated method stub

    }

}

四、AdminClient.java

package com.dayw.zk;

import java.util.Date;

import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AdminClient implements Watcher {

    private static final Logger LOG = LoggerFactory.getLogger(AdminClient.class);

    ZooKeeper zk;
    String hostPort;

    AdminClient(String hostPort) {
        this.hostPort = hostPort;
    }

    void start() throws Exception {
        zk = new ZooKeeper(hostPort, 15000, this);
    }

    void listState() throws KeeperException, InterruptedException {
        /** master */
        try {
            Stat stat = new Stat();
            byte masterData[] = zk.getData("/master", false, stat);
            Date startDate = new Date(stat.getCtime());
            System.out.println("Master: " + new String(masterData) + " since " + startDate);
        } catch (NoNodeException e) {
            System.out.println("No Master");
        }
        
        /** Workers */
        System.out.println("Workers:");
        for (String w : zk.getChildren("/workers", false)) {
            byte data[] = zk.getData("/workers/" + w, false, null);
            String state = new String(data);
            System.out.println("\t" + w + ": " + state);
        }
        
        /** Tasks */
        System.out.println("Tasks:");
        for (String t : zk.getChildren("/assign", false)) {
            System.out.println("\t" + t);
        }
    }

    @Override
    public void process(WatchedEvent event) {
        System.out.println(event);
    }
    
    public static void main(String args[]) throws Exception {
        AdminClient c = new AdminClient(args[0]);
        c.start();
        c.listState();
    }

}