1. 程式人生 > >zookeeper之curator

zookeeper之curator

Curator引數說明

1.connectString zookeeper伺服器的連線

2.retryPolicy 重試策略,預設有四個ExponentialBackoffRetry、RetryNtime、Retryonetime、RetryUtilElapsed

3.sessionTimeoutMs 回話超時時間,預設60 000ms

4.connectionTimeoutMs 連線建立超時時間, 預設15 000ms

RetryPolicy

1.retryCount 已經重試的次數,如果是第一次,那麼改值為0

2.elapsedTimeMs 從第一次嘗試開始已經花費的時間

3.Sleepeer 用於sleep指定時間。不建議使用Thread.sleep()操作

ExponentialBackoffRetry

1.baseSleepTimeMs 初始sleep時間

2.maxRetries 最大重試次數

3.maxSleepMs 最大sleep時間

典型場景使用 

1.事件監聽 簡化原生zookeeper api反覆註冊監聽

NodeCache --監控當前節點資料變化 

 1.client 客戶端例項 

 2.path 資料節點的路徑 

 3.dataIsCompressed 是否進行資料壓縮

package demo6;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;

import demo4.MyConnectionStateListener;

public class Client {
	
	private static String connectString = "10.1.65.32:2181,10.1.65.31:2181,10.1.65.30:2181";
	private static CuratorFramework client = null;
	
	public static void main(String[] args) {
		try {
			client = createSimple(connectString);
			client.start();
			MyConnectionStateListener stateListener = new MyConnectionStateListener("/app2", "this is recon");
			client.getConnectionStateListenable().addListener(stateListener);
			/**
			 * 監控當前節點資料變化
			*/
			final NodeCache nodeCache = new NodeCache(client, "/app2", false);
			nodeCache.start(true);
			nodeCache.getListenable().addListener(new NodeCacheListener() {
				
				@Override
				public void nodeChanged() throws Exception {
					System.out.println("current data : "+nodeCache.getCurrentData().getData());
				}
			}); 

		} catch (Exception e) {
			e.printStackTrace();
		}
	}
	
	private static CuratorFramework createSimple(String connects) {
		ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3);
		//DEFAULT_SESSION_TIMEOUT_MS = Integer.getInteger("curator-default-session-timeout", 60 * 1000);
	    //DEFAULT_CONNECTION_TIMEOUT_MS = Integer.getInteger("curator-default-connection-timeout", 15 * 1000);
		return CuratorFrameworkFactory.newClient(connects, retryPolicy);
	}
	
	 public static CuratorFramework createWithOptions(String connects, RetryPolicy retryPolicy, int connectionTimeoutMs, int sessionTimeoutMs) {
	        // using the CuratorFrameworkFactory.builder() gives fine grained control
	        // over creation options. See the CuratorFrameworkFactory.Builder javadoc details
	        return CuratorFrameworkFactory.builder().connectString(connects)
	                .retryPolicy(retryPolicy)
	                .connectionTimeoutMs(connectionTimeoutMs)
	                .sessionTimeoutMs(sessionTimeoutMs)
	                // etc. etc.
	                .build();
	    }

}


PathChildrenCache  --監控當前節點子節點的變化 新增 資料改變 刪除

 1.client 客戶端例項 

 2.path 資料節點的路徑 

 3.dataIsCompressed 是否進行資料壓縮 

 4.cacheDate 是否把節點內容快取起來,如果true,收到節點的資料內容同時也能夠獲取節點的資料內容 

 5.threadFactory and executor Service 構造單獨執行緒池處理事件通知

package demo6;

import java.util.concurrent.CountDownLatch;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
import org.apache.curator.retry.ExponentialBackoffRetry;

public class Client2 {
	
	private static String connectString = "10.1.65.32:2181,10.1.65.31:2181,10.1.65.30:2181";
	private static CuratorFramework client = null;
	private static CountDownLatch cl = new CountDownLatch(1);
	
	public static void main(String[] args) {
		try {
			client = createSimple(connectString);
			client.start();
//			MyConnectionStateListener stateListener = new MyConnectionStateListener("/app2", "this is recon");
//			client.getConnectionStateListenable().addListener(stateListener);
			/**
			 * 監控當前節點子節點的變化 新增 資料改變 刪除
			*/
			PathChildrenCache cache = new PathChildrenCache(client, "/app2", true);
			cache.start(StartMode.POST_INITIALIZED_EVENT);
			cache.getListenable().addListener(new PathChildrenCacheListener() {
				
				@Override
				public void childEvent(CuratorFramework client, PathChildrenCacheEvent event)
						throws Exception {
					switch (event.getType()){
					case CHILD_ADDED:
						System.out.println("add:"+event.getData().getPath());
						break;
					case CHILD_REMOVED:
						System.out.println("remove:"+event.getData().getPath());
						break;
					case CHILD_UPDATED:
						System.out.println("data change");
					    break;
					case CONNECTION_LOST:
						System.out.println("lost");
						break;
					case CONNECTION_RECONNECTED:
						System.out.println("recon");
						break;
					case CONNECTION_SUSPENDED:
						System.out.println("susp");
						break;
					default:
						break;
					}
				}
			});
			cl.await();
			

		} catch (Exception e) {
			e.printStackTrace();
		}
	}
	
	private static CuratorFramework createSimple(String connects) {
		ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3);
		//DEFAULT_SESSION_TIMEOUT_MS = Integer.getInteger("curator-default-session-timeout", 60 * 1000);
	    //DEFAULT_CONNECTION_TIMEOUT_MS = Integer.getInteger("curator-default-connection-timeout", 15 * 1000);
		return CuratorFrameworkFactory.newClient(connects, retryPolicy);
	}
	
	 public static CuratorFramework createWithOptions(String connects, RetryPolicy retryPolicy, int connectionTimeoutMs, int sessionTimeoutMs) {
	        // using the CuratorFrameworkFactory.builder() gives fine grained control
	        // over creation options. See the CuratorFrameworkFactory.Builder javadoc details
	        return CuratorFrameworkFactory.builder().connectString(connects)
	                .retryPolicy(retryPolicy)
	                .connectionTimeoutMs(connectionTimeoutMs)
	                .sessionTimeoutMs(sessionTimeoutMs)
	                // etc. etc.
	                .build();
	    }

}

Master選舉

對於複雜得得任務,僅需要叢集中一臺伺服器進行處理,可以進行master選舉
package demo7;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
import org.apache.curator.retry.ExponentialBackoffRetry;

public class Client1 {

	static String connectString = "10.1.65.32:2181,10.1.65.31:2181,10.1.65.30:2181";
	static String master_path = "/master";
	static CuratorFramework client = CuratorFrameworkFactory.builder()
										.connectString(connectString)
										.retryPolicy(new ExponentialBackoffRetry(1000, 3))
										.build();
	
	public static void main(String[] args) throws Exception {
		client.start();
		LeaderSelector selector = new LeaderSelector(client, master_path, new LeaderSelectorListenerAdapter() {
			
			@Override
			public void takeLeadership(CuratorFramework client) throws Exception {
				//執行完後,後續的才開始獲取鎖,執行改方法
				System.out.println("client 1 成為 master");
				Thread.sleep(6000);
				System.out.println("client 1 完成操作,釋放 master權利");
			}
		});
		//釋放master後,重新排隊
		selector.autoRequeue();
		selector.start();
		Thread.sleep(Integer.MAX_VALUE);
	}

}

分散式鎖

package demo8;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.CountDownLatch;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;

/**
 * 分散式鎖
 * @author ywd
 *
 */
public class Client1 {

	static String connectString = "10.1.65.32:2181,10.1.65.31:2181,10.1.65.30:2181";
	static String path = "/lock";
	static CuratorFramework client = CuratorFrameworkFactory.builder()
										.connectString(connectString)
										.retryPolicy(new ExponentialBackoffRetry(1000, 3))
										.build();
	
	public static void main(String[] args) throws Exception {
		client.start();
		final CountDownLatch downLatch = new CountDownLatch(1);
		final InterProcessMutex lock = new InterProcessMutex(client, path);
		for (int i = 0; i < 30; i++) {
			new Thread(new Runnable(){

				@Override
				public void run() {
					try {
						downLatch.await();
						//獲取鎖
						lock.acquire();
					} catch (Exception e) {
					}
					SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss|SSS");
					String orderNo = sdf.format(new Date());
					System.out.println("生成訂單號是:" + orderNo);
					try {
						//釋放鎖
						lock.release();
					} catch (Exception e) {
						e.printStackTrace();
					}
				}
				
			}).start();
		}
		downLatch.countDown();
	}

}
package demo8;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.CountDownLatch;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.atomic.AtomicValue;
import org.apache.curator.framework.recipes.atomic.DistributedAtomicInteger;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.retry.RetryNTimes;

/**
 * 分散式計數器
 * @author ywd
 *
 */
public class Client2 {

	static String connectString = "10.1.65.32:2181,10.1.65.31:2181,10.1.65.30:2181";
	static String path = "/lock";
	static CuratorFramework client = CuratorFrameworkFactory.builder()
										.connectString(connectString)
										.retryPolicy(new ExponentialBackoffRetry(1000, 3))
										.build();
	
	public static void main(String[] args) throws Exception {
		client.start();
		DistributedAtomicInteger atomicInteger = new DistributedAtomicInteger(client, path, new RetryNTimes(3, 1000));
		AtomicValue<Integer> rc = atomicInteger.add(8);
		System.out.println("Result: " + rc.succeeded());
	}

}




相關推薦

zookeepercurator

Curator引數說明 1.connectString zookeeper伺服器的連線 2.retryPolicy 重試策略,預設有四個ExponentialBackoffRetry、RetryNtime、Retryonetime、RetryUtilElapsed 3.s

zookeeperCurator框架(CRUD/事務/選舉/鎖)的使用

Curator框架是最好用,最流行的zookeeper的客戶端。 它有以下三個優點 1.提供了一套非常友好的操作API; 2. 提供一些高階特性(包括但不僅限於前篇文章中提到的)的封裝 3.易測試 maven依賴如下 [html] view plainco

zookeeper入門curator框架--幾種鎖的操作

package com.git.zookeeper.passwordmanager.lock; import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; import or

Zookeeper基於Observer部署架構

sso 解決 簡單 也不會 http 架構 即使 jsb 例如 Observers:在不傷害寫性能的情況下擴展Zookeeper 雖然通過Client直接連接到Zookeeper集群的性能已經很好了,可是這樣的架構假設要承受超大規模的Client,就必須添加Zookee

ZookeeperZookeeper底層客戶端架構實現原理(轉載)

一次 描述 綁定 機制 一個 ini fin 源碼 receive Zookeeper的Client直接與用戶打交道,是我們使用Zookeeper的interface。了解ZK Client的結構和工作原理有利於我們合理的使用ZK,並能在使用中更早的發現問題。本文將在研究源

zookeeper使用curator進行選主(Leader)

delet ram href try eat 處理方式 連接 監聽 prev 在分布式系統設計中,選主是一個常見的場景。選主是一個這樣的過程,通過選主,主節點被選擇出來控制其他節點或者是分配任務。 選主算法要滿足的幾個特征: 1)各個節點均衡的獲得成為主節點的權利,一旦主節

品味ZooKeeperWatcher機制_2

而已 s函數 boolean 搜索樹 stat 數據同步 ron 例如 chain 品味ZooKeeper之Watcher機制 本文思維導圖如下: 前言 Watcher機制是zookeeper最重要三大特性數據節點Znode+Watcher機制+ACL權限控制中的其中一個

JAVA 連接 ZooKeeper初體驗

dcom fall contex level rect trace adp acl -i Java連接Zookeeper 一、配置zk環境 本人使用的是虛擬機,創建了兩臺linux服務器(安裝過程百度上很多) 準備zk的安裝包,zookeeper-3.4.10.tar.g

Zookeeper入門(原理、基礎知識)

otto 執行 width delete 2.3 命名 類型 keep gin Zookeeper介紹 Zookeeper是分布式應用程序的協調服務框架,是Hadoop的重要組件。ZK要解決的問題: 1.分布式環境下的數據一致性。 2.分布式環境下的統一命名服務 3.分布式

zookeeper Apache Curator

1 簡介     Curator是Netflix公司開源的一套Zookeeper客戶端框架。瞭解過Zookeeper原生API都會清楚其複雜度。Curator幫助我們在其基礎上進行封裝、實現一些開發細節,包括接連重連、反覆註冊Watcher和NodeExistsExcep

ZooKeeperSession

        會話是ZooKeeper中最重要的概念之一,客戶端與服務端之間的互動操作就是依賴於ZooKeeper的會話機制。接下來分別從客戶端與服務端兩方面來講解會話的建立流程與實現細節。 1.建立會話客戶端流程     &

zookeeper客戶端和服務端的區別

客戶端是叢集外的訪問,服務端才是叢集上的提供服務的。   使用bin/zkServer.sh start開啟的zookeeper上的一個服務端,而使用bin/zkCli.sh是將客戶端連到服務端上。 客戶端可以通過服務端建立znode,刪除znode,寫znode,讀znod

zookeeper 簡單操作

ZooKeeper的資料模型及其API支援以下九個基本操作: 我們來使用ZooKeeper Java shell對上面中提到的ZooKeeper操作進行演示: 用ThisIsTheRootNode作為資料建立一個名為root的znode: [zk: localhost(CONN

zookeeper場景與架構-《每日五分鐘搞定大資料》

Zookeeper作為一個分散式協調系統提供了一項基本服務:分散式鎖服務,分散式鎖是分散式協調技術實現的核心內容。像配置管理、任務分發、組服務、分散式訊息佇列、分散式通知/協調等,這些應用實際上都是基於這項基礎服務由使用者自己摸索出來的。 1.Zookeeper在大資料系統中的常見應用 zookeepe

zookeeper運維

文章目錄版本宣告JMX叢集高可用水平擴容磁碟管理 版本宣告 Zookeeper版本3.4.6 配置檔案詳解 單擊配置 引數名 預設值 描述 clientPort 無預設值,必須

zookeeper應用與實現

Leader Elections(leader選舉) 指派一個程序作為組織者,將任務分發給各節點。在任務開始前,哪個節點都不知道誰是leader(領導者)或者coordinator(協調者)。當選舉演算法開始執行後,每個節點最終會得到一個唯一的節點作為任務l

zookeeper八】Zookeeper會話

一、前言   前面分析了Zookeeper客戶端的細節,接著繼續學習Zookeeper中的一個非常重要的概念:會話。 二、會話   客戶端與服務端之間任何互動操作都與會話息息相關,如臨時節點的生命週期、客戶端請求的順序執行、Watcher通知機制等。Zookeeper的連線與會話就是客戶

zookeeper七】Zookeeper客戶端

一、前言   前篇部落格分析了Zookeeper的序列化和通訊協議,接著繼續學習客戶端,客戶端是開發人員使用Zookeeper最主要的途徑,很有必要弄懂客戶端是如何與服務端通訊的。 二、客戶端   2.1 客戶端組成   Zookeeper客戶端主要由如下核心部件構成。   1.

ZooKeeper框架Curator的分散式鎖原始碼分析

上一篇文章中,我們使用zookeeper的java api實現了分散式排他鎖。其實zookeeper有一個優秀的框架---Curator,提供了各種分散式協調的服務。Curator中有著更為標準、規範的分散式鎖實現。與其我們自己去實現,不如直接使用Curator。通過學習Cu

ZooKeeperLeader選舉

        Leader選舉是ZooKeeper中最重要的技術之一,也是保證分散式資料一致性的關鍵所在。 1.leader選舉          Leader選舉主要分為伺服器啟動時期的Leader選舉和伺服器執行期間的Leader選舉。以下是兩種情況的選舉大致步驟。