zookeeper 的斷線重連實現
阿新 • • 發佈:2019-02-18
ZooKeeper是一個分散式的,開放原始碼的分散式應用程式協調服務,是Google的Chubby一個開源的實現,是Hadoop和Hbase的重要元件。
我們可以使用zookeeper做程式的健康監測(EPHEMERAL 臨時節點)、公共配置檔案、叢集管理(leader 選舉)等等。
但是zookeeper並沒有提供斷線重連的功能,必須我們手動實現,這裡使用 Curator來實現了zookeeper的斷線重連功能,程式碼如下:
Curator是Netflix公司開源的一個Zookeeper客戶端,與Zookeeper提供的原生客戶端相比,Curator的抽象層次更高,簡化了Zookeeper客戶端的開發量,原生的zookeeper實現起來稍微麻煩一點,下面是Curator的maven配置:import java.io.UnsupportedEncodingException; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.listen.ListenerContainer; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.framework.state.ConnectionStateListener; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; /** * @ClassName: ZookeeperExcutor * @Description: zookeeper連線處理器 */ public class ZookeeperExcutor { private CuratorFramework client; public ZookeeperExcutor(String zklist,int sessionTimeout,int connectTimeout){ client = CuratorFrameworkFactory.builder() .connectString(zklist).sessionTimeoutMs(sessionTimeout) .connectionTimeoutMs(connectTimeout) .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build(); client.start(); } public CuratorFramework getClient() { return client; } /** * @Title: createNodeAddListener * @Description: 新增node節點 * @param nodePath * @param nodeData 設定檔案 * @return void 返回型別 */ public String createNode(String nodePath,String nodeData){ if(client!=null){ try { String nodeName=client.create().creatingParentsIfNeeded() .withMode(CreateMode.EPHEMERAL_SEQUENTIAL) .forPath(nodePath, nodeData.getBytes("UTF-8")); return nodeName; } catch (UnsupportedEncodingException e) { e.printStackTrace(); return null; } catch (Exception e) { e.printStackTrace(); return null; } } return null; } /** * @Title: getListener * @Description: 為節點新增 connectState 監聽器,實現斷線重連,然後新增上節點 * @param nodePath 節點路徑 * @param nodeData 節點資料 * @return void 返回型別 */ public ConnectionStateListener getListener(final String nodePath,final String nodeData){ if(null!=client){ ConnectionStateListener connectListener = new ConnectionStateListener() { @Override public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) { if (connectionState == ConnectionState.LOST) { while (true) { try { //手動重連 boolean flag=curatorFramework.getZookeeperClient().blockUntilConnectedOrTimedOut(); if (flag){ //重新新增節點 clearListener(); createNode(nodePath, nodeData); client.getConnectionStateListenable().addListener(getListener(nodePath, nodeData)); break; } } catch (InterruptedException e) { e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } } }else if(connectionState==ConnectionState.RECONNECTED){ //重新連線成功 }else if(connectionState==ConnectionState.SUSPENDED){ //自動重連,自動新建 schedular的臨時節點 } } }; return connectListener; } return null; } public void clearListener(){ ListenerContainer<ConnectionStateListener> list=(ListenerContainer<ConnectionStateListener>) client.getConnectionStateListenable(); list.clear(); } public void addListener(String nodePath,String nodeData){ client.getConnectionStateListenable().addListener(getListener(nodePath, nodeData)); } public static void main(String[] args) { ZookeeperExcutor zke=new ZookeeperExcutor("127.0.0.1:2181",10000, 10000); String nodeName=zke.createNode("/Test", "test"); if(null!=nodeName){ zke.addListener("/Test", "test"); } } }
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>2.4.2</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-client</artifactId> <version>2.4.2</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>2.4.2</version> </dependency>