Curator之Leader選舉
阿新 • • 發佈:2019-02-16
基於LeaderSelector實現
當客戶端成為Leader時,採用非同步回撥的方式呼叫takeLeadership,執行相應的邏輯;
優點如下:
1. 非同步非阻塞;
2. 對於Leadership控制很靈活;
3. Leadership釋放後可以重新排隊競爭Leader;
4. 連接出現異常時,原先Leader正在執行的邏輯會被取消,防止出現併發;
/**
* 類說明:Leader選舉客戶端
*
* @author ruipeng.lrp
* @since 2017/10/19
**/
public class LeaderSelectorClient implements Closeable{
private static final String NAMESPACE = "dps";
private static final String LOCK_PATH = "/leader";
private CuratorFramework client;
private LeaderSelector leaderSelector;
private String connectString = "30.117.84.40:2181";
private int sessionTimeoutMs = 15*1000 ;
private int connectionTimeoutMs = 3*1000;
private String name;
public void init() throws Exception{
try{
client = CuratorFrameworkFactory.builder()
.connectString(this.connectString)
.sessionTimeoutMs(sessionTimeoutMs)
.connectionTimeoutMs(connectionTimeoutMs)
.retryPolicy(new ExponentialBackoffRetry(1000, 3)) //斷連重試策略
.namespace(NAMESPACE) //應用隔離,相當於chroot
.build();
leaderSelector = new LeaderSelector(client, LOCK_PATH, Executors.newSingleThreadExecutor(), new LeaderSelectorListenerAdapter(){
@Override
public void takeLeadership(CuratorFramework client) throws Exception {
leaderAction();
}
});
leaderSelector.autoRequeue();
client.start();
leaderSelector.start();
}catch(Exception e){
CloseableUtils.closeQuietly(leaderSelector);
CloseableUtils.closeQuietly(client);
}
}
//成為leader後的業務邏輯
public void leaderAction() throws Exception{
while(true){
System.out.println(name + " is the leader");
Thread.sleep(1000);
}
}
@Override
public void close() throws IOException {
CloseableUtils.closeQuietly(leaderSelector);
CloseableUtils.closeQuietly(client);
System.out.println("shutting down...");
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public static void main(String[] args) throws Exception {
LeaderSelectorClient c1 = new LeaderSelectorClient();
c1.init();
c1.setName("A1");
}
}
基於LeaderLatch實現
相當於分散式的CountDownLatch,阻塞等待直到成為leader,leadership釋放後無法再次排隊競爭leader;
public class LeaderLatchClient implements Closeable{
private static final String NAMESPACE = "dps";
private static final String LOCK_PATH = "/leader";
private CuratorFramework client;
private LeaderLatch leaderLatch;
private String connectString = "30.117.84.40:2181";
private int sessionTimeoutMs = 15*1000;
private int connectionTimeoutMs = 3*1000;
private String name;
public void init() throws Exception{
try{
client = CuratorFrameworkFactory.builder()
.connectString(this.connectString)
.sessionTimeoutMs(sessionTimeoutMs)
.connectionTimeoutMs(connectionTimeoutMs)
.retryPolicy(new ExponentialBackoffRetry(1000, 3)) //斷連重試策略
.namespace(NAMESPACE) //應用隔離相當於chroot
.build();
leaderLatch = new LeaderLatch(client, LOCK_PATH);
client.start();
leaderLatch.start();
}catch(Exception e){
CloseableUtils.closeQuietly(leaderLatch);
CloseableUtils.closeQuietly(client);
}
}
//成為leader後的業務邏輯
public void leaderAction() throws Exception{
leaderLatch.await(); //阻塞等待成為leader
//持續關注leadership狀態變更
while(leaderLatch.hasLeadership()){
System.out.println(name + " is the leader");
Thread.sleep(1000);
}
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@Override
public void close() throws IOException {
CloseableUtils.closeQuietly(leaderLatch);
CloseableUtils.closeQuietly(client);
}
public static void main(String[] args) throws Exception {
LeaderLatchClient c1 = new LeaderLatchClient();
c1.init();
c1.setName("A1");
c1.leaderAction();
}
}