Curator場景應用
阿新 • • 發佈:2018-05-31
pos sdf AI CI rgs 時間 out keep string
分布式鎖功能:
在分布式場景中,我們為了保證數據的一致性,經常在程序運行的某一個點,需要進行同步操作,(java提供synchronized或者Reentrantlock實現),
使用curator基於zookeeper的特性提供的分布式鎖來處理分布式場景的數據一致性,zookeeper本身的分布式是有寫問題的,這裏推薦使用Curator框架的
InterProcessMutex來實現。
1 package bjsxt.curator.lock; 2 3 import java.text.SimpleDateFormat; 4 import java.util.Date; 5 importjava.util.concurrent.CountDownLatch; 6 7 import org.apache.curator.RetryPolicy; 8 import org.apache.curator.framework.CuratorFramework; 9 import org.apache.curator.framework.CuratorFrameworkFactory; 10 import org.apache.curator.framework.recipes.locks.InterProcessMutex; 11 import org.apache.curator.retry.ExponentialBackoffRetry;12 13 public class Lock2 { 14 15 /** zookeeper地址 */ 16 static final String CONNECT_ADDR = "192.168.2.2:2181"; 17 /** session超時時間 */ 18 static final int SESSION_OUTTIME = 5000;// ms 19 20 static int count = 10; 21 22 public static void genarNo() { 23 try { 24 count--;25 System.out.println(count); 26 } finally { 27 28 } 29 } 30 31 public static void main(String[] args) throws Exception { 32 33 // 1 重試策略:初試時間為1s 重試10次 34 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10); 35 // 2 通過工廠創建連接 36 CuratorFramework cf = CuratorFrameworkFactory.builder() 37 .connectString(CONNECT_ADDR).sessionTimeoutMs(SESSION_OUTTIME) 38 .retryPolicy(retryPolicy) 39 // .namespace("super") 40 .build(); 41 // 3 開啟連接 42 cf.start(); 43 44 // 4 分布式鎖 45 final InterProcessMutex lock = new InterProcessMutex(cf, "/super"); 46 // final ReentrantLock reentrantLock = new ReentrantLock(); 47 final CountDownLatch countdown = new CountDownLatch(1); 48 49 for (int i = 0; i < 10; i++) { 50 new Thread(new Runnable() { 51 @Override 52 public void run() { 53 try { 54 countdown.await(); 55 // 加鎖 56 lock.acquire(); 57 // reentrantLock.lock(); 58 // -------------業務處理開始 59 // genarNo(); 60 SimpleDateFormat sdf = new SimpleDateFormat( 61 "HH:mm:ss|SSS"); 62 System.out.println(sdf.format(new Date())); 63 // System.out.println(System.currentTimeMillis()); 64 // -------------業務處理結束 65 } catch (Exception e) { 66 e.printStackTrace(); 67 } finally { 68 try { 69 // 釋放 70 lock.release(); 71 // reentrantLock.unlock(); 72 } catch (Exception e) { 73 e.printStackTrace(); 74 } 75 } 76 } 77 }, "t" + i).start(); 78 } 79 Thread.sleep(100); 80 countdown.countDown(); 81 82 } 83 }
分布式計數器功能
分布式計數器,在單JVM中,我們可以通過AtomicInteger這種經典的方式實現,但是在分布式的場景下,就需要利用Curator框架的DistributedAtomicInteger來實現
1 package bjsxt.curator.atomicinteger; 2 3 import org.apache.curator.RetryPolicy; 4 import org.apache.curator.framework.CuratorFramework; 5 import org.apache.curator.framework.CuratorFrameworkFactory; 6 import org.apache.curator.framework.recipes.atomic.AtomicValue; 7 import org.apache.curator.framework.recipes.atomic.DistributedAtomicInteger; 8 import org.apache.curator.retry.ExponentialBackoffRetry; 9 import org.apache.curator.retry.RetryNTimes; 10 11 public class CuratorAtomicInteger { 12 13 /** zookeeper地址 */ 14 static final String CONNECT_ADDR = "192.168.2.2:2181"; 15 /** session超時時間 */ 16 static final int SESSION_OUTTIME = 5000;// ms 17 18 public static void main(String[] args) throws Exception { 19 20 // 1 重試策略:初試時間為1s 重試10次 21 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10); 22 // 2 通過工廠創建連接 23 CuratorFramework cf = CuratorFrameworkFactory.builder() 24 .connectString(CONNECT_ADDR).sessionTimeoutMs(SESSION_OUTTIME) 25 .retryPolicy(retryPolicy).build(); 26 // 3 開啟連接 27 cf.start(); 28 // cf.delete().forPath("/super"); 29 30 // 4 使用DistributedAtomicInteger 31 DistributedAtomicInteger atomicIntger = new DistributedAtomicInteger( 32 cf, "/super", new RetryNTimes(3, 1000)); 33 34 AtomicValue<Integer> value = atomicIntger.add(1); 35 System.out.println(value.succeeded()); 36 System.out.println(value.postValue()); // 最新值 37 System.out.println(value.preValue()); // 原始值 38 39 } 40 }
Curator框架,讓一些很困難的問題,簡單化了
Curator場景應用