Zookeeper 自增序列實踐
阿新 • • 發佈:2021-10-31
1、增加依賴
<!-- curator ZK客戶端--> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>2.10.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>2.10.0</version> </dependency>
2、定義生成sequence型別
/** * 序列型別 */ public enum ZkSequenceEnum { FIRST, SECOND, THIRD }
3、序列封裝
/** * 通過分散式原子自增類(DistributedAtomicLong)實現, * 注意每5000毫秒重試5次後仍然生成失敗則返回null,由上層處理 */ public class ZkSequence { RetryPolicy retryPolicy = new ExponentialBackoffRetry(5000,5); DistributedAtomicLong distributedAtomicLong; public ZkSequence(String sequenceName, CuratorFramework client ){ distributedAtomicLong = new DistributedAtomicLong( client,sequenceName, retryPolicy); } /** * 生成序列 * @return */ public Long sequence() throws Exception{ AtomicValue<Long> sequence = this.distributedAtomicLong.increment(); if(sequence.succeeded()){ return sequence.postValue(); }else { return null; } } }
4、配置檔案
@Configuration @ConfigurationProperties(prefix = "zk") @PropertySource("classpath:zookeeper.properties") public class ZkConfig { String host = "118.xx.xx.101"; String sequencePath = "/new/sequence/"; @Bean public ZookeeperClient zookeeperClient() { return new ZookeeperClient(this.host, this.sequencePath); } }
5、客戶端封裝
public class ZookeeperClient { private String host; private String sequencePath; // 重試休眠時間 private final int SLEEP_TIME_MS = 1000; // 最大重試1000次 private final int MAX_RETRIES = 1000; //會話超時時間 private final int SESSION_TIMEOUT = 30 * 1000; //連線超時時間 private final int CONNECTION_TIMEOUT = 3 * 1000; //建立連線例項 private CuratorFramework client = null; // 序列化集合 private Map<String, ZkSequence> zkSequence = Maps.newConcurrentMap(); public ZookeeperClient(String host, String sequencePath) { this.host = host; this.sequencePath = sequencePath; } public String getHost() { return host; } public void setHost(String host) { this.host = host; } @PostConstruct public void init() throws Exception { this.client = CuratorFrameworkFactory.builder() .connectString(this.getHost()) .connectionTimeoutMs(CONNECTION_TIMEOUT) .sessionTimeoutMs(SESSION_TIMEOUT) .retryPolicy(new ExponentialBackoffRetry(SLEEP_TIME_MS, MAX_RETRIES)) .build(); this.client.start(); this.initZkSequence(); } public void initZkSequence() { ZkSequenceEnum[] list = ZkSequenceEnum.values(); for (int i = 0; i < list.length; i++) { String name = list[i].name(); String path = this.sequencePath + name; ZkSequence seq = new ZkSequence(path, this.client); zkSequence.put(name, seq); } } /*** 生成SEQ */ public Long sequence(ZkSequenceEnum name) { Long result = null; try { ZkSequence seq = zkSequence.get(name.name()); if (seq != null) { result = seq.sequence(); } } catch (Exception e) { System.out.println("獲取" + name + "Sequence錯誤: " + e.getMessage()); } return result; } }
5、呼叫客戶端
@Component public class Sequences { @Autowired private ZookeeperClient client; public Long sequenceFist(){ return this.client.sequence(ZkSequenceEnum.FIRST); } public Long sequenceSecond(){ return this.client.sequence(ZkSequenceEnum.SECOND); } public Long sequenceThird(){ return this.client.sequence(ZkSequenceEnum.THIRD); } }
6、測試
@SpringBootTest class ZookeepdemoApplicationTests { // 第一步,注入Sequences @Autowired private Sequences sequences; @Test void sequenceApCollection() throws Exception { ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5)); for (int i = 0; i < 10; i++) { MyTask myTask = new MyTask(sequences); executor.execute(myTask); } Thread.sleep(100000); executor.shutdown(); } private class MyTask implements Runnable { private Sequences sequences; public MyTask(Sequences sequences) { this.sequences = sequences; } @Override public void run() { // 第二步,在方法中呼叫生成 Long num = sequences.sequenceFist(); System.out.println("num=" + num); } } }
分別啟動兩個測試工程
作者:Work Hard Work Smart
出處:http://www.cnblogs.com/linlf03/
歡迎任何形式的轉載,未經作者同意,請保留此段宣告!