1. 程式人生 > >Curator之Leader選舉

Curator之Leader選舉

基於LeaderSelector實現

  當客戶端成為Leader時,採用非同步回撥的方式呼叫takeLeadership,執行相應的邏輯;
  優點如下:
    1. 非同步非阻塞;
    2. 對於Leadership控制很靈活;
    3. Leadership釋放後可以重新排隊競爭Leader;
    4. 連接出現異常時,原先Leader正在執行的邏輯會被取消,防止出現併發;

/**
 * 類說明:Leader選舉客戶端
 * 
 * @author ruipeng.lrp
 * @since 2017/10/19
 **/
public class LeaderSelectorClient
implements Closeable{
private static final String NAMESPACE = "dps"; private static final String LOCK_PATH = "/leader"; private CuratorFramework client; private LeaderSelector leaderSelector; private String connectString = "30.117.84.40:2181"; private int sessionTimeoutMs = 15*1000
; private int connectionTimeoutMs = 3*1000; private String name; public void init() throws Exception{ try{ client = CuratorFrameworkFactory.builder() .connectString(this.connectString) .sessionTimeoutMs(sessionTimeoutMs) .connectionTimeoutMs(connectionTimeoutMs) .retryPolicy(new
ExponentialBackoffRetry(1000, 3)) //斷連重試策略 .namespace(NAMESPACE) //應用隔離,相當於chroot .build(); leaderSelector = new LeaderSelector(client, LOCK_PATH, Executors.newSingleThreadExecutor(), new LeaderSelectorListenerAdapter(){ @Override public void takeLeadership(CuratorFramework client) throws Exception { leaderAction(); } }); leaderSelector.autoRequeue(); client.start(); leaderSelector.start(); }catch(Exception e){ CloseableUtils.closeQuietly(leaderSelector); CloseableUtils.closeQuietly(client); } } //成為leader後的業務邏輯 public void leaderAction() throws Exception{ while(true){ System.out.println(name + " is the leader"); Thread.sleep(1000); } } @Override public void close() throws IOException { CloseableUtils.closeQuietly(leaderSelector); CloseableUtils.closeQuietly(client); System.out.println("shutting down..."); } public String getName() { return name; } public void setName(String name) { this.name = name; } public static void main(String[] args) throws Exception { LeaderSelectorClient c1 = new LeaderSelectorClient(); c1.init(); c1.setName("A1"); } }

基於LeaderLatch實現

  相當於分散式的CountDownLatch,阻塞等待直到成為leader,leadership釋放後無法再次排隊競爭leader;

public class LeaderLatchClient implements Closeable{
    private static final String NAMESPACE = "dps";
    private static final String LOCK_PATH = "/leader";
    private CuratorFramework client;
    private LeaderLatch leaderLatch;
    private String connectString = "30.117.84.40:2181";
    private int sessionTimeoutMs = 15*1000;
    private int connectionTimeoutMs = 3*1000;
    private String name;

    public void init() throws Exception{
        try{
            client = CuratorFrameworkFactory.builder()
                    .connectString(this.connectString)
                    .sessionTimeoutMs(sessionTimeoutMs)
                    .connectionTimeoutMs(connectionTimeoutMs)
                    .retryPolicy(new ExponentialBackoffRetry(1000, 3)) //斷連重試策略
                    .namespace(NAMESPACE) //應用隔離相當於chroot
                    .build();
            leaderLatch = new LeaderLatch(client, LOCK_PATH);
            client.start();
            leaderLatch.start();
        }catch(Exception e){
            CloseableUtils.closeQuietly(leaderLatch);
            CloseableUtils.closeQuietly(client);
        }
    }

    //成為leader後的業務邏輯
    public void leaderAction() throws Exception{
        leaderLatch.await(); //阻塞等待成為leader
        //持續關注leadership狀態變更
        while(leaderLatch.hasLeadership()){
            System.out.println(name + " is the leader");
            Thread.sleep(1000);
        }
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    @Override
    public void close() throws IOException {
        CloseableUtils.closeQuietly(leaderLatch);
        CloseableUtils.closeQuietly(client);
    }

    public static void main(String[] args) throws Exception {
        LeaderLatchClient c1 = new LeaderLatchClient();
        c1.init();
        c1.setName("A1");
        c1.leaderAction();
    }
}