1. 程式人生 > 其它 >Zookeeper 自增序列實踐

Zookeeper 自增序列實踐

1、增加依賴

        <!--        curator ZK客戶端-->
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>2.10.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>2.10.0</version>
        </dependency>

  

2、定義生成sequence型別

/**
 * 序列型別
 */
public enum ZkSequenceEnum {
    FIRST,
    SECOND,
    THIRD
}

  

3、序列封裝

/**
 * 通過分散式原子自增類(DistributedAtomicLong)實現,
 * 注意每5000毫秒重試5次後仍然生成失敗則返回null,由上層處理
 */
public class ZkSequence {

    
    RetryPolicy retryPolicy = new ExponentialBackoffRetry(5000,5);

    DistributedAtomicLong distributedAtomicLong;

    public ZkSequence(String sequenceName, CuratorFramework client ){
        distributedAtomicLong = new DistributedAtomicLong( client,sequenceName, retryPolicy);
    }
    
    /**
     * 生成序列
     * @return
     */
    public  Long sequence() throws  Exception{
        AtomicValue<Long> sequence = this.distributedAtomicLong.increment();
        if(sequence.succeeded()){
            return  sequence.postValue();
        }else {
            return  null;
        }
    }

}

4、配置檔案

@Configuration
@ConfigurationProperties(prefix = "zk")
@PropertySource("classpath:zookeeper.properties")
public class ZkConfig {
    String host = "118.xx.xx.101";
    String sequencePath = "/new/sequence/";


    @Bean
    public ZookeeperClient zookeeperClient() {
        return new ZookeeperClient(this
.host, this.sequencePath); } }

  

5、客戶端封裝

public class ZookeeperClient {

    private String host;
    private String sequencePath;
    // 重試休眠時間
    private final int SLEEP_TIME_MS = 1000;
    // 最大重試1000次
    private final int MAX_RETRIES = 1000;
    //會話超時時間
    private final int SESSION_TIMEOUT = 30 * 1000;
    //連線超時時間
    private final int CONNECTION_TIMEOUT = 3 * 1000;
    //建立連線例項
    private CuratorFramework client = null;
    // 序列化集合
    private Map<String, ZkSequence> zkSequence = Maps.newConcurrentMap();

    public ZookeeperClient(String host, String sequencePath) {
        this.host = host;
        this.sequencePath = sequencePath;
    }


    public String getHost() {
        return host;
    }

    public void setHost(String host) {
        this.host = host;
    }

    @PostConstruct
    public void init() throws Exception {
        this.client = CuratorFrameworkFactory.builder()
                .connectString(this.getHost())
                .connectionTimeoutMs(CONNECTION_TIMEOUT)
                .sessionTimeoutMs(SESSION_TIMEOUT)
                .retryPolicy(new ExponentialBackoffRetry(SLEEP_TIME_MS, MAX_RETRIES))
                .build();
        this.client.start();
        this.initZkSequence();
    }

    public void initZkSequence() {
        ZkSequenceEnum[] list = ZkSequenceEnum.values();
        for (int i = 0; i < list.length; i++) {
            String name = list[i].name();
            String path = this.sequencePath + name;
            ZkSequence seq = new ZkSequence(path, this.client);
            zkSequence.put(name, seq);
        }
    }

    /*** 生成SEQ  */
    public Long sequence(ZkSequenceEnum name) {
        Long result = null;
        try {
            ZkSequence seq = zkSequence.get(name.name());
            if (seq != null) {
                result = seq.sequence();
            }

        } catch (Exception e) {
            System.out.println("獲取" + name + "Sequence錯誤: " + e.getMessage());
        }
        return result;
    }

}

  

5、呼叫客戶端

@Component
public class Sequences {

    @Autowired
    private ZookeeperClient client;

    public Long sequenceFist(){
        return this.client.sequence(ZkSequenceEnum.FIRST);
    }

    public Long sequenceSecond(){
        return this.client.sequence(ZkSequenceEnum.SECOND);
    }

    public Long sequenceThird(){
        return this.client.sequence(ZkSequenceEnum.THIRD);
    }

 }

  

6、測試

@SpringBootTest
class ZookeepdemoApplicationTests {


    // 第一步,注入Sequences
    @Autowired
    private Sequences sequences;


    @Test
    void sequenceApCollection() throws Exception {


        ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS,
                new ArrayBlockingQueue<Runnable>(5));

        for (int i = 0; i < 10; i++) {
            MyTask myTask = new MyTask(sequences);
            executor.execute(myTask);
        }
        Thread.sleep(100000);
        executor.shutdown();


    }

    private class MyTask implements Runnable {

        private Sequences sequences;

        public MyTask(Sequences sequences) {
            this.sequences = sequences;
        }

        @Override
        public void run() {
            // 第二步,在方法中呼叫生成
            Long num = sequences.sequenceFist();
            System.out.println("num=" + num);

        }
    }

} 

分別啟動兩個測試工程

作者:Work Hard Work Smart
出處:http://www.cnblogs.com/linlf03/
歡迎任何形式的轉載,未經作者同意,請保留此段宣告!