1. 程式人生 > >Curator場景應用

Curator場景應用

pos sdf AI CI rgs 時間 out keep string

分布式鎖功能:

在分布式場景中,我們為了保證數據的一致性,經常在程序運行的某一個點,需要進行同步操作,(java提供synchronized或者Reentrantlock實現),

使用curator基於zookeeper的特性提供的分布式鎖來處理分布式場景的數據一致性,zookeeper本身的分布式是有寫問題的,這裏推薦使用Curator框架的

InterProcessMutex來實現。

 1 package bjsxt.curator.lock;
 2 
 3 import java.text.SimpleDateFormat;
 4 import java.util.Date;
 5 import
java.util.concurrent.CountDownLatch; 6 7 import org.apache.curator.RetryPolicy; 8 import org.apache.curator.framework.CuratorFramework; 9 import org.apache.curator.framework.CuratorFrameworkFactory; 10 import org.apache.curator.framework.recipes.locks.InterProcessMutex; 11 import org.apache.curator.retry.ExponentialBackoffRetry;
12 13 public class Lock2 { 14 15 /** zookeeper地址 */ 16 static final String CONNECT_ADDR = "192.168.2.2:2181"; 17 /** session超時時間 */ 18 static final int SESSION_OUTTIME = 5000;// ms 19 20 static int count = 10; 21 22 public static void genarNo() { 23 try { 24 count--;
25 System.out.println(count); 26 } finally { 27 28 } 29 } 30 31 public static void main(String[] args) throws Exception { 32 33 // 1 重試策略:初試時間為1s 重試10次 34 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10); 35 // 2 通過工廠創建連接 36 CuratorFramework cf = CuratorFrameworkFactory.builder() 37 .connectString(CONNECT_ADDR).sessionTimeoutMs(SESSION_OUTTIME) 38 .retryPolicy(retryPolicy) 39 // .namespace("super") 40 .build(); 41 // 3 開啟連接 42 cf.start(); 43 44 // 4 分布式鎖 45 final InterProcessMutex lock = new InterProcessMutex(cf, "/super"); 46 // final ReentrantLock reentrantLock = new ReentrantLock(); 47 final CountDownLatch countdown = new CountDownLatch(1); 48 49 for (int i = 0; i < 10; i++) { 50 new Thread(new Runnable() { 51 @Override 52 public void run() { 53 try { 54 countdown.await(); 55 // 加鎖 56 lock.acquire(); 57 // reentrantLock.lock(); 58 // -------------業務處理開始 59 // genarNo(); 60 SimpleDateFormat sdf = new SimpleDateFormat( 61 "HH:mm:ss|SSS"); 62 System.out.println(sdf.format(new Date())); 63 // System.out.println(System.currentTimeMillis()); 64 // -------------業務處理結束 65 } catch (Exception e) { 66 e.printStackTrace(); 67 } finally { 68 try { 69 // 釋放 70 lock.release(); 71 // reentrantLock.unlock(); 72 } catch (Exception e) { 73 e.printStackTrace(); 74 } 75 } 76 } 77 }, "t" + i).start(); 78 } 79 Thread.sleep(100); 80 countdown.countDown(); 81 82 } 83 }

分布式計數器功能

分布式計數器,在單JVM中,我們可以通過AtomicInteger這種經典的方式實現,但是在分布式的場景下,就需要利用Curator框架的DistributedAtomicInteger來實現

 1 package bjsxt.curator.atomicinteger;
 2 
 3 import org.apache.curator.RetryPolicy;
 4 import org.apache.curator.framework.CuratorFramework;
 5 import org.apache.curator.framework.CuratorFrameworkFactory;
 6 import org.apache.curator.framework.recipes.atomic.AtomicValue;
 7 import org.apache.curator.framework.recipes.atomic.DistributedAtomicInteger;
 8 import org.apache.curator.retry.ExponentialBackoffRetry;
 9 import org.apache.curator.retry.RetryNTimes;
10 
11 public class CuratorAtomicInteger {
12 
13     /** zookeeper地址 */
14     static final String CONNECT_ADDR = "192.168.2.2:2181";
15     /** session超時時間 */
16     static final int SESSION_OUTTIME = 5000;// ms
17 
18     public static void main(String[] args) throws Exception {
19 
20         // 1 重試策略:初試時間為1s 重試10次
21         RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
22         // 2 通過工廠創建連接
23         CuratorFramework cf = CuratorFrameworkFactory.builder()
24                 .connectString(CONNECT_ADDR).sessionTimeoutMs(SESSION_OUTTIME)
25                 .retryPolicy(retryPolicy).build();
26         // 3 開啟連接
27         cf.start();
28         // cf.delete().forPath("/super");
29 
30         // 4 使用DistributedAtomicInteger
31         DistributedAtomicInteger atomicIntger = new DistributedAtomicInteger(
32                 cf, "/super", new RetryNTimes(3, 1000));
33 
34         AtomicValue<Integer> value = atomicIntger.add(1);
35         System.out.println(value.succeeded());
36         System.out.println(value.postValue()); // 最新值
37         System.out.println(value.preValue()); // 原始值
38 
39     }
40 }

Curator框架,讓一些很困難的問題,簡單化了

Curator場景應用