巨集任務佇列與微任務佇列
阿新 • • 發佈:2021-10-15
ZooKeeper介紹
官方網站地址:http://zookeeper.apache.org/。
Hadoop的正式子專案,它是一個針對大型分散式系統的可靠協調系統,提供的功能包括:配置維護、名字服務、分散式鎖、選舉、事件監聽等。
zookeeper一般以一主多從的模式執行,以保證高可用性。
zookeeper每個節點資料不可超過1M,且所有資料均載入到記憶體,因此不適用於大量資料的儲存。
ShardingSphere-JDBC + zookeeper + MHA,可以實現mysql的狀態監控、主從自動切換、java應用資料來源自動切換。
Curator
curator是java類庫,專用於操作zookeeper。如下是一些使用示例。
pom.xml
引入依賴。
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-client</artifactId> <version>5.2.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>5.2.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>5.2.0</version> </dependency>
ZKTool.java
對zookeeper基礎操作的工具類。
package syb.test.busi; import java.nio.charset.Charset; import java.util.List; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.data.Stat; /** * zookeeper操作工具類 * */ public class ZKTool { public static CuratorFramework client = null; /** * 初始化,建立並啟動client。程式啟動時,執行一次 */ public static void init(String connectionString) { client = createClientAndStart(connectionString); } private static CuratorFramework createClientAndStart(String connectionString) { ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3); CuratorFramework client = CuratorFrameworkFactory.newClient(connectionString, retryPolicy); client.start(); return client; } /** * 關閉client。程式停止時,執行一次 */ public static void close() { if (client != null) { client.close(); } } public static CuratorFramework getClient() { return client; } public static boolean checkExist(String path) throws Exception { boolean exist = false; Stat stat = client.checkExists().forPath(path); if (stat != null) { exist = true; } return exist; } public static void setData(String path, String data) throws Exception { client.setData().forPath(path, data.getBytes()); } public static String getData(String path) throws Exception { byte[] contentBytes = client.getData().forPath(path); String contentString = new String(contentBytes, Charset.forName("UTF8")); return contentString; } public static String getData(String path, Watcher watcher) throws Exception { byte[] contentBytes = client.getData().usingWatcher(watcher).forPath(path); String contentString = new String(contentBytes, Charset.forName("UTF8")); return contentString; } public static String createNode(String path, CreateMode cm) throws Exception { String newNodePath = client.create().creatingParentsIfNeeded().withMode(cm).forPath(path, new byte[0]); return newNodePath; } public static String createNode(String path, CreateMode cm, String data) throws Exception { String newNodePath = client.create().creatingParentsIfNeeded().withMode(cm).forPath(path, data.getBytes()); return newNodePath; } public static void deleteNode(String path) throws Exception { client.delete().deletingChildrenIfNeeded().forPath(path); } /** * 獲取子節點列表。注:僅包含子節點的名稱,並不是全路徑。 */ public static List<String> getChildren(String path) throws Exception { List<String> cList = client.getChildren().forPath(path); return cList; } }
Wather事件監聽
Watcher監聽只能執行一次,監聽到一次後,就失效了。可用於監聽連線斷開等事件。
**
*watcher方式的事件監聽
*/
privatevoidwatcherSample()throwsException{
StringworkerPath="/watcher_test/node";
if(!ZKTool.checkExist(workerPath)){//節點不存在
ZKTool.createNode(workerPath,CreateMode.PERSISTENT);
}
Stringcontent=ZKTool.getData(workerPath,(e)->{
logger.info("收到事件,WatchedEvent:{}",e);
});
logger.info("初始內容:{}",content);
logger.info("變更為1");
ZKTool.setData(workerPath,"1");
LockSupport.parkNanos(5L*1000*1000*1000);
}
curatorCache事件監聽
與Watcher不同,可持續監聽。
/**
*cache方式的事件監聽
*/
privatevoidcuratorCacheSample()throwsException{
CuratorCachecache=null;
try{
StringworkerPath="/watcher_test/node";
cache=CuratorCache.build(ZKTool.getClient(),workerPath);
CuratorCacheListenerlistener=CuratorCacheListener.builder().forCreates((node)->{
logger.info("Nodecreated:{}",node);
}).forChanges((oldNode,node)->{
logger.info("Nodechanged.Old:{},New:{}",oldNode,node);
}).forDeletes((oldNode)->{
logger.info("Nodedeleted.Oldvalue:{}",oldNode);
}).build();
cache.listenable().addListener(listener);
cache.start();
//建立節點
if(!ZKTool.checkExist(workerPath)){
logger.info("建立節點");
ZKTool.createNode(workerPath,CreateMode.PERSISTENT);
}
//修改節點
logger.info("修改節點");
ZKTool.setData(workerPath,String.valueOf(newRandom().nextInt(100)));
//新增子節點
logger.info("新增子節點");
StringsubPath=ZKTool.createNode("/watcher_test/node/sub_",CreateMode.PERSISTENT_SEQUENTIAL);
logger.info("subPath:{}",subPath);
//修改子節點
logger.info("修改子節點");
ZKTool.setData(subPath,String.valueOf(newRandom().nextInt(100)));
//刪除節點
logger.info("刪除節點");
ZKTool.deleteNode("/watcher_test");
LockSupport.parkNanos(5L*1000*1000*1000);
}catch(Exceptione){
throwe;
}finally{
if(cache!=null){
cache.close();
}
}
}
Curator分散式鎖
可靠性高,但效能不如redis。
/**
*curator自帶的分散式鎖示例
*/
privatevoidinterProcessMutexSample(){
InterProcessMutexmutex=newInterProcessMutex(ZKTool.getClient(),"/mutex");
Stringkey="k";
Map<String,Integer>m=newHashMap<>();
m.put(key,0);
intthreadCount=10;
CountDownLatchlatch=newCountDownLatch(threadCount);
longstart=System.currentTimeMillis();
for(inti=0;i<threadCount;i++){
newThread(()->{
try{
mutex.acquire();
m.put(key,m.get(key)+1);
latch.countDown();
logger.info("---{}",m.get(key));
}catch(Exceptione){
logger.error("",e);
}finally{
try{
mutex.release();
}catch(Exceptione){
logger.error("",e);
}
}
}).start();
}
try{
latch.await();
}catch(InterruptedExceptione){
logger.error("",e);
}
longend=System.currentTimeMillis();
longusetime=end-start;
logger.info("value={},time:{}ms",m.get(key),usetime);
logger.info("每一次操作,需要的時間:{}",usetime/threadCount);
}
事務操作
zookeeper的事務操作方式。
CuratorOp createOp = ZKTool.getClient().transactionOp().create().forPath("/apath", "some data".getBytes());
CuratorOp setDataOp = ZKTool.getClient().transactionOp().setData().forPath("/apath", "other data".getBytes());
CuratorOp deleteOp = ZKTool.getClient().transactionOp().delete().forPath("/todel");
Collection<CuratorTransactionResult> results = ZKTool.getClient().transaction().forOperations(createOp,
setDataOp, deleteOp);
for (CuratorTransactionResult result : results) {
logger.info(result.getForPath() + " - " + result.getType());
}
--結束--