1. 程式人生 > 其它 >巨集任務佇列與微任務佇列

巨集任務佇列與微任務佇列

ZooKeeper介紹

官方網站地址:http://zookeeper.apache.org/。
Hadoop的正式子專案,它是一個針對大型分散式系統的可靠協調系統,提供的功能包括:配置維護、名字服務、分散式鎖、選舉、事件監聽等。
zookeeper一般以一主多從的模式執行,以保證高可用性。
zookeeper每個節點資料不可超過1M,且所有資料均載入到記憶體,因此不適用於大量資料的儲存。
ShardingSphere-JDBC + zookeeper + MHA,可以實現mysql的狀態監控、主從自動切換、java應用資料來源自動切換。

Curator

curator是java類庫,專用於操作zookeeper。如下是一些使用示例。

pom.xml

引入依賴。

<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
<version>5.2.0</version>
</dependency>

<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>5.2.0</version>
</dependency>

<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.2.0</version>
</dependency>

ZKTool.java

對zookeeper基礎操作的工具類。

package syb.test.busi;

import java.nio.charset.Charset;
import java.util.List;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.data.Stat;

/**
 * zookeeper操作工具類
 *
 */
public class ZKTool {
	public static CuratorFramework client = null;
	
	/**
	 * 初始化,建立並啟動client。程式啟動時,執行一次
	 */
	public static void init(String connectionString) {
		client = createClientAndStart(connectionString);
	}
	
	private static CuratorFramework createClientAndStart(String connectionString) {
		ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3);
		CuratorFramework client = CuratorFrameworkFactory.newClient(connectionString, retryPolicy);
		client.start();
		return client;
	}

	/**
	 * 關閉client。程式停止時,執行一次
	 */
	public static void close() {
		if (client != null) {
			client.close();
		}
	}
	
	public static CuratorFramework getClient() {
		return client;
	}

	public static boolean checkExist(String path) throws Exception {
		boolean exist = false;
		Stat stat = client.checkExists().forPath(path);
		if (stat != null) {
			exist = true;
		}
		return exist;
	}

	public static void setData(String path, String data) throws Exception {
		client.setData().forPath(path, data.getBytes());
	}

	public static String getData(String path) throws Exception {
		byte[] contentBytes = client.getData().forPath(path);
		String contentString = new String(contentBytes, Charset.forName("UTF8"));
		return contentString;
	}

	public static String getData(String path, Watcher watcher) throws Exception {
		byte[] contentBytes = client.getData().usingWatcher(watcher).forPath(path);
		String contentString = new String(contentBytes, Charset.forName("UTF8"));
		return contentString;
	}

	public static String createNode(String path, CreateMode cm) throws Exception {
		String newNodePath = client.create().creatingParentsIfNeeded().withMode(cm).forPath(path, new byte[0]);
		return newNodePath;
	}

	public static String createNode(String path, CreateMode cm, String data) throws Exception {
		String newNodePath = client.create().creatingParentsIfNeeded().withMode(cm).forPath(path, data.getBytes());
		return newNodePath;
	}
	
	public static void deleteNode(String path) throws Exception {
		client.delete().deletingChildrenIfNeeded().forPath(path);
	}
	
	/**
	 * 獲取子節點列表。注:僅包含子節點的名稱,並不是全路徑。
	 */
	public static List<String> getChildren(String path) throws Exception {
		List<String> cList = client.getChildren().forPath(path);
		return cList;
	}
}

Wather事件監聽

Watcher監聽只能執行一次,監聽到一次後,就失效了。可用於監聽連線斷開等事件。

**
*watcher方式的事件監聽
*/
privatevoidwatcherSample()throwsException{
StringworkerPath="/watcher_test/node";
if(!ZKTool.checkExist(workerPath)){//節點不存在
ZKTool.createNode(workerPath,CreateMode.PERSISTENT);
}
Stringcontent=ZKTool.getData(workerPath,(e)->{
logger.info("收到事件,WatchedEvent:{}",e);
});
logger.info("初始內容:{}",content);

logger.info("變更為1");
ZKTool.setData(workerPath,"1");

LockSupport.parkNanos(5L*1000*1000*1000);
}

curatorCache事件監聽

與Watcher不同,可持續監聽。

/**
*cache方式的事件監聽
*/
privatevoidcuratorCacheSample()throwsException{
CuratorCachecache=null;
try{
StringworkerPath="/watcher_test/node";

cache=CuratorCache.build(ZKTool.getClient(),workerPath);

CuratorCacheListenerlistener=CuratorCacheListener.builder().forCreates((node)->{
logger.info("Nodecreated:{}",node);
}).forChanges((oldNode,node)->{
logger.info("Nodechanged.Old:{},New:{}",oldNode,node);
}).forDeletes((oldNode)->{
logger.info("Nodedeleted.Oldvalue:{}",oldNode);
}).build();

cache.listenable().addListener(listener);
cache.start();

//建立節點
if(!ZKTool.checkExist(workerPath)){
logger.info("建立節點");
ZKTool.createNode(workerPath,CreateMode.PERSISTENT);
}

//修改節點
logger.info("修改節點");
ZKTool.setData(workerPath,String.valueOf(newRandom().nextInt(100)));

//新增子節點
logger.info("新增子節點");
StringsubPath=ZKTool.createNode("/watcher_test/node/sub_",CreateMode.PERSISTENT_SEQUENTIAL);
logger.info("subPath:{}",subPath);

//修改子節點
logger.info("修改子節點");
ZKTool.setData(subPath,String.valueOf(newRandom().nextInt(100)));

//刪除節點
logger.info("刪除節點");
ZKTool.deleteNode("/watcher_test");

LockSupport.parkNanos(5L*1000*1000*1000);
}catch(Exceptione){
throwe;
}finally{
if(cache!=null){
cache.close();
}
}
}

Curator分散式鎖

可靠性高,但效能不如redis。

/**
*curator自帶的分散式鎖示例
*/
privatevoidinterProcessMutexSample(){
InterProcessMutexmutex=newInterProcessMutex(ZKTool.getClient(),"/mutex");
Stringkey="k";
Map<String,Integer>m=newHashMap<>();
m.put(key,0);

intthreadCount=10;
CountDownLatchlatch=newCountDownLatch(threadCount);
longstart=System.currentTimeMillis();
for(inti=0;i<threadCount;i++){
newThread(()->{
try{
mutex.acquire();
m.put(key,m.get(key)+1);
latch.countDown();
logger.info("---{}",m.get(key));
}catch(Exceptione){
logger.error("",e);
}finally{
try{
mutex.release();
}catch(Exceptione){
logger.error("",e);
}
}

}).start();
}
try{
latch.await();
}catch(InterruptedExceptione){
logger.error("",e);
}
longend=System.currentTimeMillis();
longusetime=end-start;
logger.info("value={},time:{}ms",m.get(key),usetime);
logger.info("每一次操作,需要的時間:{}",usetime/threadCount);
}

事務操作

zookeeper的事務操作方式。

CuratorOp createOp = ZKTool.getClient().transactionOp().create().forPath("/apath", "some data".getBytes());
CuratorOp setDataOp = ZKTool.getClient().transactionOp().setData().forPath("/apath", "other data".getBytes());
CuratorOp deleteOp = ZKTool.getClient().transactionOp().delete().forPath("/todel");

Collection<CuratorTransactionResult> results = ZKTool.getClient().transaction().forOperations(createOp,
        setDataOp, deleteOp);

for (CuratorTransactionResult result : results) {
    logger.info(result.getForPath() + " - " + result.getType());
}

--結束--