使用curator操作zookeeper
阿新 • • 發佈:2019-02-14
使用Java操作zookeeper時,一般有兩種方式:使用zkclient或者curator,相比較來說,curator的使用較為簡便。今天就來看看如何使用curator來操作zookeeper。
需要的依賴如下:
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>2.8.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-client</artifactId> <version>2.8.0</version> </dependency> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.6</version> </dependency> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>16.0.1</version> </dependency>
首先,我們建立一個客戶端類,來真正的操作zookeeper,包括建立連線、建立節點,寫入值、讀取值等。
import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Random; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArraySet; import org.apache.commons.lang.StringUtils; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.api.CuratorWatcher; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.framework.state.ConnectionStateListener; import org.apache.curator.retry.RetryNTimes; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher.Event; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class CuratorZookeeperClient { private final int CONNECT_TIMEOUT = 15000; private final int RETRY_TIME = Integer.MAX_VALUE; private final int RETRY_INTERVAL = 1000; private static final Logger logger = LoggerFactory.getLogger(CuratorZookeeperClient.class); private CuratorFramework curator; private volatile static CuratorZookeeperClient instance; /** * key:父路徑,如/jobcenter/client/goodscenter * value:Map-->key:子路徑,如/jobcenter/client/goodscenter/goodscenter00000001 * value:路徑中的值 */ private static ConcurrentHashMap<String,Map<String,String>> zkCacheMap = new ConcurrentHashMap<String,Map<String,String>>(); public static Map<String,Map<String,String>> getZkCacheMap() { return zkCacheMap; } private CuratorFramework newCurator(String zkServers) { return CuratorFrameworkFactory.builder().connectString(zkServers) .retryPolicy(new RetryNTimes(RETRY_TIME, RETRY_INTERVAL)) .connectionTimeoutMs(CONNECT_TIMEOUT).build(); } private CuratorZookeeperClient(String zkServers) { if(curator == null) { curator = newCurator(zkServers); curator.getConnectionStateListenable().addListener(new ConnectionStateListener() { public void stateChanged(CuratorFramework client, ConnectionState state) { if (state == ConnectionState.LOST) { //連線丟失 logger.info("lost session with zookeeper"); } else if (state == ConnectionState.CONNECTED) { //連線新建 logger.info("connected with zookeeper"); } else if (state == ConnectionState.RECONNECTED) { logger.info("reconnected with zookeeper"); //連線重連 for(ZkStateListener s:stateListeners){ s.reconnected(); } } } }); curator.start(); } } public static CuratorZookeeperClient getInstance(String zkServers) { if(instance == null) { synchronized(CuratorZookeeperClient.class) { if(instance == null) { logger.info("initial CuratorZookeeperClient instance"); instance = new CuratorZookeeperClient(zkServers); } } } return instance; } /** * 寫資料:/docker/jobcenter/client/app/app0..../app1...../app2 * @param path * @param content * * @return 返回真正寫到的路徑 * @throws Exception */ public String write(String path,String content) throws Exception { StringBuilder sb = new StringBuilder(path); String writePath = curator.create().creatingParentsIfNeeded() .withMode(CreateMode.EPHEMERAL_SEQUENTIAL) .forPath(sb.toString(), content.getBytes("utf-8")); return writePath; } /** * 隨機讀取一個path子路徑 * 先從cache中讀取,如果沒有,再從zookeeper中查詢 * @param path * @return * @throws Exception */ public String readRandom(String path) throws Exception { String parentPath = path; Map<String,String> cacheMap = zkCacheMap.get(path); if(cacheMap != null && cacheMap.size() > 0) { logger.debug("get random value from cache,path="+path); return getRandomValue4Map(cacheMap); } if(curator.checkExists().forPath(path) == null) { logger.debug("path [{}] is not exists,return null",path); return null; } else { logger.debug("read random from zookeeper,path="+path); cacheMap = new HashMap<String,String>(); List<String> list = curator.getChildren().usingWatcher(new ZKWatcher(parentPath,path)).forPath(path); if(list == null || list.size() == 0) { logger.debug("path [{}] has no children return null",path); return null; } Random rand = new Random(); String child = list.get(rand.nextInt(list.size())); path = path + "/" + child; byte[] b = curator.getData().usingWatcher(new ZKWatcher(parentPath,path)) .forPath(path); String value = new String(b,"utf-8"); if(StringUtils.isNotBlank(value)) { cacheMap.put(path, value); zkCacheMap.put(parentPath, cacheMap); } return value; } } /** * 讀取path下所有子路徑下的內容 * 先從map中讀取,如果不存在,再從zookeeper中查詢 * @param path * @return * @throws Exception */ public List<String> readAll(String path) throws Exception { String parentPath = path; Map<String,String> cacheMap = zkCacheMap.get(path); List<String> list = new ArrayList<String>(); if(cacheMap != null) { logger.debug("read all from cache,path="+path); list.addAll(cacheMap.values()); return list; } if(curator.checkExists().forPath(path) == null) { logger.debug("path [{}] is not exists,return null",path); return null; } else { cacheMap = new HashMap<String,String>(); List<String> children = curator.getChildren().usingWatcher(new ZKWatcher(parentPath,path)).forPath(path); if(children == null || children.size() == 0) { logger.debug("path [{}] has no children,return null",path); return null; } else { logger.debug("read all from zookeeper,path="+path); String basePath = path; for(String child : children) { path = basePath + "/" + child; byte[] b = curator.getData().usingWatcher(new ZKWatcher(parentPath,path)) .forPath(path); String value = new String(b,"utf-8"); if(StringUtils.isNotBlank(value)) { list.add(value); cacheMap.put(path, value); } } } zkCacheMap.put(parentPath, cacheMap); return list; } } /** * 隨機獲取Map中的一個值 * @param map * @return */ private String getRandomValue4Map(Map<String,String> map) { Object[] values = map.values().toArray(); Random rand = new Random(); return values[rand.nextInt(values.length)].toString(); } public void delete(String path) throws Exception { if(curator.checkExists().forPath(path) != null) { curator.delete().inBackground().forPath(path); zkCacheMap.remove(path); } } /** * 獲取路徑下的所有子路徑 * @param path * @return */ public List<String> getChildren(String path) throws Exception { if(curator.checkExists().forPath(path) == null) { logger.debug("path [{}] is not exists,return null",path); return null; } else { List<String> children = curator.getChildren().forPath(path); return children; } } public void close() { if(curator != null) { curator.close(); curator = null; } zkCacheMap.clear(); } /** * zookeeper監聽節點資料變化 * @author lizhiyang * */ private class ZKWatcher implements CuratorWatcher { private String parentPath; private String path; public ZKWatcher(String parentPath,String path) { this.parentPath = parentPath; this.path = path; } public void process(WatchedEvent event) throws Exception { Map<String,String> cacheMap = zkCacheMap.get(parentPath); if(cacheMap == null) { cacheMap = new HashMap<String,String>(); } if(event.getType() == Event.EventType.NodeDataChanged || event.getType() == Event.EventType.NodeCreated){ byte[] data = curator.getData(). usingWatcher(this).forPath(path); cacheMap.put(path, new String(data,"utf-8")); logger.info("add cache={}",new String(data,"utf-8")); } else if(event.getType() == Event.EventType.NodeDeleted) { cacheMap.remove(path); logger.info("remove cache path={}",path); } else if(event.getType() == Event.EventType.NodeChildrenChanged) { //子節點發生變化,重新進行快取 cacheMap.clear(); List<String> children = curator.getChildren().usingWatcher(new ZKWatcher(parentPath,path)).forPath(path); if(children != null && children.size() > 0) { for(String child : children) { String childPath = parentPath + "/" + child; byte[] b = curator.getData().usingWatcher(new ZKWatcher(parentPath,childPath)) .forPath(childPath); String value = new String(b,"utf-8"); if(StringUtils.isNotBlank(value)) { cacheMap.put(childPath, value); } } } logger.info("node children changed,recaching path={}",path); } zkCacheMap.put(parentPath, cacheMap); } } private final Set<ZkStateListener> stateListeners = new CopyOnWriteArraySet<ZkStateListener>(); public void addStateListener(ZkStateListener listener) { stateListeners.add(listener); }
其中,我們對節點和值進行了快取,避免頻繁的訪問zookeeper。在對zookeeper操作時,對連線丟失、連線新建、重連等事件進行了監聽,使用到了類ZkStateListener
public interface ZkStateListener {
void reconnected();
}
下面,我們就使用ZkDockerService類來封住客戶端的操作。
其中在使用定時任務時,使用到了NamedThreadFactory,如下:import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * jobclient docker改造 * 註冊應用資訊至zookeeper * @author lizhiyang * */ public class ZkDockerService { private static final Logger logger = LoggerFactory.getLogger(ZkDockerService.class); private CuratorZookeeperClient zkClient; private Set<String> zkPathList = new HashSet<String>(); // 失敗重試定時器,定時檢查是否有請求失敗,如有,無限次重試 private ScheduledFuture<?> retryFuture; // 定時任務執行器 private final ScheduledExecutorService retryExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("RegistryFailedRetryTimer", true)); //需要重新註冊的資料 private Set<ClientData> retrySet = new HashSet<ClientData>(); /** * init-method,初始化執行 * 將本機docker的IP地址 埠都註冊到zookeeper中 */ public void register2Zookeeper() { try { zkClient = CuratorZookeeperClient.getInstance(ZOOKEEPER_ADDRESS); ClientData client = findClientData(); registerClientData(client); zkClient.addStateListener(new ZkStateListener(){ @Override public void reconnected() { ClientData client = findClientData(); //將服務新增到重試列表 retrySet.add(client); } }); //啟動執行緒進行重試,1秒執行一次,因為jobcenter的定時觸發時間最短的是1秒 this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() { public void run() { // 檢測並連線註冊中心 try { retryRegister(); } catch (Throwable t) { // 防禦性容錯 logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t); } } }, 1, 1, TimeUnit.SECONDS); } catch (Exception e) { logger.error("zookeeper write exception",e); } } /** * destrory-method,銷燬時執行 */ public void destroy4Zookeeper() { logger.info("zkDockerService destrory4Zookeeper path="+zkPathList); try { if(retryFuture != null){ retryFuture.cancel(true); } } catch (Throwable t) { logger.warn(t.getMessage(), t); } if(zkPathList != null && zkPathList.size() > 0) { for(String path : zkPathList) { try { zkClient.delete(path); } catch (Exception e) { logger.error("zkDockerService destrory4Zookeeper exception",e); } } } zkClient.close(); } /** 構造要儲存的物件 **/ private ClientData findClientData() { ClientData client = new ClientData(); client.setIpAddress(ip); client.setPort(port); client.setSource(1); return client; } /** 將值寫入zookeeper中 **/ private void registerClientData(ClientData client) throws Exception{ String centerPath = "/server"; String content = ""; String strServer = zkClient.write(centerPath, content); if(!StringUtils.isBlank(strServer)) { zkPathList.add(strServer); } } /** * 重連到zookeeper時,自動重試 */ protected synchronized void retryRegister() { if(!retrySet.isEmpty()){ logger.info("jobclient begin retry register client to zookeeper"); Set<ClientData> retryClients = new HashSet<ClientData>(retrySet); for(ClientData data :retryClients){ logger.info("retry register="+data); try { registerJobcenterClient(data); retrySet.remove(data); } catch (Exception e) { logger.error("registerJobcenterClient failed",e); } } } } }
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
public class NamedThreadFactory implements ThreadFactory
{
private static final AtomicInteger POOL_SEQ = new AtomicInteger(1);
private final AtomicInteger mThreadNum = new AtomicInteger(1);
private final String mPrefix;
private final boolean mDaemo;
private final ThreadGroup mGroup;
public NamedThreadFactory()
{
this("pool-" + POOL_SEQ.getAndIncrement(),false);
}
public NamedThreadFactory(String prefix)
{
this(prefix,false);
}
public NamedThreadFactory(String prefix,boolean daemo)
{
mPrefix = prefix + "-thread-";
mDaemo = daemo;
SecurityManager s = System.getSecurityManager();
mGroup = ( s == null ) ? Thread.currentThread().getThreadGroup() : s.getThreadGroup();
}
public Thread newThread(Runnable runnable)
{
String name = mPrefix + mThreadNum.getAndIncrement();
Thread ret = new Thread(mGroup,runnable,name,0);
ret.setDaemon(mDaemo);
return ret;
}
public ThreadGroup getThreadGroup()
{
return mGroup;
}
}