1. 程式人生 > >Apache Curator Leader Election

Apache Curator Leader Election

用於Leader選舉,也可以用Shared Reentrant Lock來實現。

如果需要叢集中的固定的一臺機器去做的事,就可以用此特性來實現,直到這臺Leader死去,會產生新的Leader。

還有一種典型的場景,master-slave模式。也可以用Curator Leader Election輕鬆實現,包括充當maste的機器死掉後,會產生新的master,以及當新加入機器後,該機器會直接參與Leader競爭者。That's Awesome.

1.直接執行LLtest

package com.collonn.javaUtilMvn.zookeeper.curator.LeaderElection;

import com.collonn.javaUtilMvn.zookeeper.curator.NodeCache.NLClientCreate;
import com.collonn.javaUtilMvn.zookeeper.curator.NodeCache.NLClientDelete;
import com.collonn.javaUtilMvn.zookeeper.curator.NodeCache.NLClientUpdate;

public class LLTest {
    public static void main(String[] args) throws Exception {
        LeaderListener.main(null);
        LeaderListener2.main(null);
        LeaderListener3.main(null);
        LeaderListener4.main(null);
    }
}


package com.collonn.javaUtilMvn.zookeeper.curator.LeaderElection;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.EnsurePath;

import java.util.List;

public class LeaderListener {
    public static final String C_PATH = "/TestLeader";
    public static final String CHARSET = "UTF-8";
    public static final String APP_NAME = "app1";

    public static void main(String[] args) {
        try {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try{
                        String zookeeperConnectionString = "127.0.0.1:2181";
                        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
                        CuratorFramework client = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy);
                        client.start();

                        //ensure path of /test
                        new EnsurePath(C_PATH).ensure(client.getZookeeperClient());

                        final LeaderSelector leaderSelector = new LeaderSelector(client, C_PATH, new LeaderSelectorListener() {
                            @Override
                            public void takeLeadership(CuratorFramework client) throws Exception {
                                try {
                                    int timeMilliSeconds = 6000;
                                    System.out.println("======" + APP_NAME + " take leader ship, will do some task, will hold time milli seconds=" + timeMilliSeconds);

                                    //once you take the leader ship
                                    //and you want hold the leader ship during the whole life of APP1
                                    //you should use Thread.sleep(Integer.MAX_VALUE)
                                    //once tha APP1 dead, the other APP (participants) will reElect an new leader
                                    for(int i = 0; i < 6; i++){
                                        System.out.println("===" + APP_NAME + " sleep " + i);
                                        Thread.sleep(1000);
                                    }
                                }catch (Exception e){
                                    e.printStackTrace();
                                }
                            }

                            @Override
                            public void stateChanged(CuratorFramework client, ConnectionState newState) {

                            }
                        });

                        leaderSelector.start();

                        Thread.sleep(Integer.MAX_VALUE);
                        client.close();
                    }catch (Exception e){
                        e.printStackTrace();
                    }
                }
            }).start();
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}


package com.collonn.javaUtilMvn.zookeeper.curator.LeaderElection;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.EnsurePath;

public class LeaderListener2 {
    public static final String C_PATH = "/TestLeader";
    public static final String CHARSET = "UTF-8";
    public static final String APP_NAME = "app2";

    public static void main(String[] args) {
        try {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try{
                        String zookeeperConnectionString = "127.0.0.1:2181";
                        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
                        CuratorFramework client = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy);
                        client.start();

                        //ensure path of /test
                        new EnsurePath(C_PATH).ensure(client.getZookeeperClient());

                        final LeaderSelector leaderSelector = new LeaderSelector(client, C_PATH, new LeaderSelectorListener() {
                            @Override
                            public void takeLeadership(CuratorFramework client) throws Exception {
                                try {
                                    int timeMilliSeconds = 6000;
                                    System.out.println("======" + APP_NAME + " take leader ship, will do some task, will hold time milli seconds=" + timeMilliSeconds);

                                    for(int i = 0; i < 6; i++){
                                        System.out.println("===" + APP_NAME + " sleep " + i);
                                        Thread.sleep(1000);
                                    }
                                }catch (Exception e){
                                    e.printStackTrace();
                                }
                            }

                            @Override
                            public void stateChanged(CuratorFramework client, ConnectionState newState) {

                            }
                        });

                        leaderSelector.start();

                        Thread.sleep(Integer.MAX_VALUE);
                        client.close();
                    }catch (Exception e){
                        e.printStackTrace();
                    }
                }
            }).start();
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}


package com.collonn.javaUtilMvn.zookeeper.curator.LeaderElection;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.EnsurePath;

public class LeaderListener3 {
    public static final String C_PATH = "/TestLeader";
    public static final String CHARSET = "UTF-8";
    public static final String APP_NAME = "app3";

    public static void main(String[] args) {
        try {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try{
                        String zookeeperConnectionString = "127.0.0.1:2181";
                        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
                        CuratorFramework client = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy);
                        client.start();

                        //ensure path of /test
                        new EnsurePath(C_PATH).ensure(client.getZookeeperClient());

                        final LeaderSelector leaderSelector = new LeaderSelector(client, C_PATH, new LeaderSelectorListener() {
                            @Override
                            public void takeLeadership(CuratorFramework client) throws Exception {
                                try {
                                    int timeMilliSeconds = 6000;
                                    System.out.println("======" + APP_NAME + " take leader ship, will do some task, will hold time milli seconds=" + timeMilliSeconds);

                                    for(int i = 0; i < 6; i++){
                                        System.out.println("===" + APP_NAME + " sleep " + i);
                                        Thread.sleep(1000);
                                    }
                                }catch (Exception e){
                                    e.printStackTrace();
                                }
                            }

                            @Override
                            public void stateChanged(CuratorFramework client, ConnectionState newState) {

                            }
                        });

                        leaderSelector.start();

                        Thread.sleep(Integer.MAX_VALUE);
                        client.close();
                    }catch (Exception e){
                        e.printStackTrace();
                    }
                }
            }).start();
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}


package com.collonn.javaUtilMvn.zookeeper.curator.LeaderElection;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.EnsurePath;

public class LeaderListener4 {
    public static final String C_PATH = "/TestLeader";
    public static final String CHARSET = "UTF-8";
    public static final String APP_NAME = "app4";

    public static void main(String[] args) {
        try {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try{
                        String zookeeperConnectionString = "127.0.0.1:2181";
                        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
                        CuratorFramework client = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy);
                        client.start();

                        //ensure path of /test
                        new EnsurePath(C_PATH).ensure(client.getZookeeperClient());

                        final LeaderSelector leaderSelector = new LeaderSelector(client, C_PATH, new LeaderSelectorListener() {
                            @Override
                            public void takeLeadership(CuratorFramework client) throws Exception {
                                try {
                                    int timeMilliSeconds = 6000;
                                    System.out.println("======" + APP_NAME + " take leader ship, will do some task, will hold time milli seconds=" + timeMilliSeconds);

                                    for(int i = 0; i < 6; i++){
                                        System.out.println("===" + APP_NAME + " sleep " + i);
                                        Thread.sleep(1000);
                                    }
                                }catch (Exception e){
                                    e.printStackTrace();
                                }
                            }

                            @Override
                            public void stateChanged(CuratorFramework client, ConnectionState newState) {

                            }
                        });

                        leaderSelector.start();

                        Thread.sleep(Integer.MAX_VALUE);
                        client.close();
                    }catch (Exception e){
                        e.printStackTrace();
                    }
                }
            }).start();
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}


相關推薦

Apache Curator Leader Election

用於Leader選舉,也可以用Shared Reentrant Lock來實現。 如果需要叢集中的固定的一臺機器去做的事,就可以用此特性來實現,直到這臺Leader死去,會產生新的Leader。 還有一種典型的場景,master-slave模式。也可以用Curator Le

Zookeeper 學習筆記之 Leader Election

通知 客戶 就會 lec 搶占式 類型 二次 lead per ZooKeeper四種節點類型: Persist Persist_Sequential Ephemeral Ephemeral_Sequential 在節點上可註冊的Watch,客戶端先得到通知再得到數據,

Apache Curator操作zookeeper的API使用

zookeeper 分布式 集群 curator 中間件 curator簡介與客戶端之間的異同點 常用的zookeeper java客戶端: zookeeper原生Java API zkclient Apache curator ZooKeeper原生Java API的不足之處: 在

如何運用zookepper進行kafka Leader Election?

註冊 bsp roo 可能 zookepper 監聽 election 如果 etc 主要有兩種方法: (一)搶註Leader節點-----非公平模式 (二)先到先得,後者監聽前者-----公平模式 (一)搶註Leader節點-----非公平模式 1.創建Leader父節點

【zookeeper】Apache curator的使用及zk分布式鎖實現

sets finally tac -- ont zkcli 單節點 基本操作 新建 上篇,本篇主要講Apache開源的curator的使用,有了curator,利用Java對zookeeper的操作變得極度便捷. 其實在學之前我也有個疑慮,我為啥要學curator,撇開漲薪

【zookeeper】Apache curator的使用及zk分散式鎖實現

接上篇,本篇主要講Apache開源的curator的使用,有了curator,利用Java對zookeeper的操作變得極度便捷. 其實在學之前我也有個疑慮,我為啥要學curator,撇開漲薪這些外在的東西,就單技術層面來講,學curator能幫我做些什麼?這就不得不從zookeeper說起,上

Apache Curator客戶端

一:Apache Curator簡介 1. Curator主要從以下幾個方面降低了zk使用的複雜性 重試機制:提供可插拔的重試機制, 它將給捕獲所有可恢復的異常配置一個重試策略,並且內部也提供了幾種標準的重試策略(比如指數補償) 連線狀態監控: Curator初始化

java.lang.NoClassDefFoundError: org/apache/curator/RetryPolicy解決方法

今天整合es-job到公司的框架時,啟動時出現上述錯誤 java.lang.NoClassDefFoundError: org/apache/curator/RetryPolicy at storm.kafka.KafkaSpout.open(KafkaSpout.java:68) at backtype.

zookeeper Apache Curator

1 簡介     Curator是Netflix公司開源的一套Zookeeper客戶端框架。瞭解過Zookeeper原生API都會清楚其複雜度。Curator幫助我們在其基礎上進行封裝、實現一些開發細節,包括接連重連、反覆註冊Watcher和NodeExistsExcep

第7章 Apache Curator客戶端的使用

Apache Curator客戶端的使用 7-1 curator簡介與客戶端之間的異同點 7-2 搭建maven工程,建立curator與zkserver的連線 7-3 zk名稱空間以及建立節點 7-9 zk-watcher例項 統一更新N臺節點的配置

Kafka引數unclean.leader.election.enable詳解

如何提高Kafka可靠性是一個可以長篇大論的主題。很多初學者會簡單的認為將客戶端引數acks設定為-1即可保證Kafka的可靠性,顯然這是很片面的觀點。就可靠性本身而言,它並不是一個可以用“是”或者“否”來衡量的一個指標,而一般是用幾個9來衡量。就引數方面而言,與Kafka可靠性相關的引數不止ack

Apache Curator簡單介紹

提供了一個抽象級別更高的API,來操作Zookeeper,類似Guava提供的很多工具,讓Java書寫起來更加方便。至於有沒有用,那就要看每個人自己的理解了。 1、依賴 <dependency> <groupId>org.a

storm:ConnectionStateManager-0 WARN org.apache.curator.framework.state.ConnectionStateManager

strom異常解決: [ConnectionStateManager-0] WARN  org.apache.curator.framework.state.ConnectionStateManager - There are no ConnectionStateListe

使用Apache Curator管理ZooKeeper

Apache ZooKeeper是為了幫助解決複雜問題的軟體工具,它可以幫助使用者從複雜的實現中解救出來。 然而,ZooKeeper只暴露了原語,這取決於使用者如何使用這些原語來解決應用程式中的協調問題。 社群已經在ZooKeeper資料模型及其API之上開發了

Apache Curator操作zookeeper的API使用——watcher

curator在註冊watch事件上,提供了一個usingWatcher方法,使用這個方法註冊的watch事件和預設watch事件一樣,監聽只會觸發一次,監聽完畢後就會銷燬,也就是一次性的。而這個方法有兩種引數可選,一個是zk原生API的Watcher介面的實現類,另一個是Curator提供的Cur

深入學習Kafka:Leader Election

本文所講的Leader是指叢集中的Controller,而不是各個Partition的Leader。 為什麼要有Leader? 在Kafka早期版本,對於分割槽和副本的狀態的管理依賴於zookeeper的Watcher和佇列:每一個broker都會在

使用Apache Curator實現服務的註冊和發現

使用zookeeper可以實現服務的註冊和發現,而Curator是對zookeeper進行的一層封裝,自然也封裝了一套實現服務的註冊和發現,本文就介紹如何使用Curator實現服務的註冊和發現 首先要安裝zookeeper,我這裡安裝的是:zookeeper-3.4.6

【第三方類庫】Apache Curator

Curator是Netflix公司開源的一個Zookeeper客戶端,與Zookeeper提供的原生客戶端相比,Curator的抽象層次更高,簡化了Zookeeper客戶端的開發量。 包含一下幾個模

Apache Curator簡單使用(一)

轉載自: http://ifeve.com/zookeeper-curato-framework/                http://blog.csdn.net/dc_726/article/details/46475633                http

Apache curator-recipes程式碼範例

Apache curator-recipes程式碼例項 ? ? Apache curator-recipes元件提供了大量已經"生產化"(produced)的特性,極大的簡化了使用zk的複雜度. ? ? 1. Cache: 提供了對一個Node持續監聽,如果節點資料變更,