1. 程式人生 > >利用Redis實現叢集或開發環境下SnowFlake自動配置機器號

利用Redis實現叢集或開發環境下SnowFlake自動配置機器號

前言:

SnowFlake 雪花ID 演算法是推特公司推出的著名分散式ID生成演算法。利用預先分配好的機器ID,工作區ID,機器時間可以生成全域性唯一的隨時間趨勢遞增的Long型別ID.長度在17-19位。隨著時間的增長而遞增,在MySQL資料庫中,InnoDB儲存引擎可以更快的插入遞增的主鍵。而不像UUID那樣因為寫入是亂序的,InnoDB不得不頻繁的做頁分裂操作,耗時且容易產生碎片。

對於SnowFlake 的原理介紹,可以參考該文章:理解分散式id生成演算法SnowFlake

理解了雪花的基本原理之後,我們試想:在分散式叢集或者開發環境下,不同服務之間/相同服務的不同機器之間應該如何產生差異呢?有以下幾種方案:

  1. 通過在 yml 檔案中配置不同的引數,啟動 spring 容器時通過讀取該引數來實現不同服務與不同機器的workerId不同。但是這裡不方便新增機器/新同事的自動化配置
  2. 向第三方應用如zookeeper、Redis中註冊ID,以獲得唯一的ID。
  3. 對於開發環境,可以取機器的IP後三位。因為大家在一個辦公室的話IP後三位肯定是0-255之前不重複。但是這樣機器ID需要8個Bit,留給資料中心的位數就只有4個了。

本方案結合了以上方案的優點,按照業務的實際情況對雪花中的資料中心和機器ID所佔的位數進行調整:資料中心佔4Bit,範圍從0-15。機器ID佔6Bit,範圍從0-63
。對不同的服務在yml中配置服務名稱,以服務編號作為資料中心ID。如果按照開發+測試+生產環境區分的話,可以部署5個不同的服務。application.yml 中配置如下的引數

# 分散式雪花ID不同機器ID自動化配置
snowFlake:
  dataCenter: 1 # 資料中心的id
  appName: test # 業務型別名稱

而機器ID採用以下的策略實現:

  1. 獲取當前機器的IP地址 localIp,模32,獲得0-31的整數 machineId
  2. 向Redis中註冊,使用 appName + dataCenter + machineId 作為key ,以本機IP localIp 作為 value。
  3. 註冊成功後,設定鍵過期時間 24 h,並開啟一個計時器,在 23h 後更新註冊的 key
  4. 如果註冊失敗,可能有以下兩個原因:
    1. 上次服務異常中斷,沒有來得及刪除key。這裡的解決方案是通過key獲取value,如果value和localIp一致,則仍然視為註冊成功
    2. IP和別人的IP模32的結果一樣,導致機器ID衝突。這是就遍歷 0-31 獲取其中為註冊的數字作為本機的機器號
  5. 如果不幸Redis連線失敗,系統將從32-63之間隨機獲取ID,並使用 log.error() 列印醒目的提示訊息這裡建議IDEA + Grep Console 實現不同級別的日誌不同前景色顯示,方便及時獲取錯誤資訊
  6. 當服務停止前,向Redis傳送請求,刪除該Key的佔用。

具體的程式碼如下:

自動配置機器ID,並在容器啟動時放入SnowFlake例項物件

package cn.keats.util;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;

import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Date;
import java.util.Timer;
import java.util.TimerTask;

@Configuration
@Slf4j
public class MachineIdConfig {
    @Resource
    private JedisPool jedisPool;

    @Value("${snowFlake.dataCenter}")
    private Integer dataCenterId;

    @Value("${snowFlake.appName}")
    private String APP_NAME;

    /**
     * 機器id
     */
    public static Integer machineId;
    /**
     * 本地ip地址
     */
    private static String localIp;

    /**
     * 獲取ip地址
     *
     * @return
     * @throws UnknownHostException
     */
    private String getIPAddress() throws UnknownHostException {
        InetAddress address = InetAddress.getLocalHost();
        return address.getHostAddress();
    }

    /**
     * hash機器IP初始化一個機器ID
     */
    @Bean
    public SnowFlake initMachineId() throws Exception {
        localIp = getIPAddress(); // 192.168.0.233

        Long ip_ = Long.parseLong(localIp.replaceAll("\\.", ""));// 1921680233
        //
        machineId = ip_.hashCode() % 32;// 0-31
        // 建立一個機器ID
        createMachineId();

        log.info("初始化 machine_id :{}", machineId);
        return new SnowFlake(machineId, dataCenterId);
    }

    /**
     * 容器銷燬前清除註冊記錄
     */
    @PreDestroy
    public void destroyMachineId() {
        try (Jedis jedis = jedisPool.getResource()) {
            jedis.del(APP_NAME + dataCenterId + machineId);
        }
    }


    /**
     * 主方法:首先獲取機器 IP 並 % 32 得到 0-31
     * 使用 業務名 + 組名 + IP 作為 Redis 的 key,機器IP作為 value,儲存到Redis中
     *
     * @return
     */
    public Integer createMachineId() {
        try {
            // 向redis註冊,並設定超時時間
            log.info("註冊一個機器ID到Redis " + machineId + " IP:" + localIp);
            Boolean flag = registerMachine(machineId, localIp);
            // 註冊成功
            if (flag) {
                // 啟動一個執行緒更新超時時間
                updateExpTimeThread();
                // 返回機器Id
                log.info("Redis中埠沒有衝突 " + machineId + " IP:" + localIp);
                return machineId;
            }
            // 註冊失敗,可能原因 Hash%32 的結果衝突
            if (!checkIfCanRegister()) {
                // 如果 0-31 已經用完,使用 32-64之間隨機的ID
                getRandomMachineId();
                createMachineId();
            } else {
                // 如果存在剩餘的ID
                log.warn("Redis中埠衝突了,使用 0-31 之間未佔用的Id " + machineId + " IP:" + localIp);
                createMachineId();
            }
        } catch (Exception e) {
            // 獲取 32 - 63 之間的隨機Id
            // 返回機器Id
            log.error("Redis連線異常,不能正確註冊雪花機器號 " + machineId + " IP:" + localIp, e);
            log.warn("使用臨時方案,獲取 32 - 63 之間的隨機數作為機器號,請及時檢查Redis連線");
            getRandomMachineId();
            return machineId;
        }
        return machineId;
    }

    /**
     * 檢查是否被註冊滿了
     *
     * @return
     */
    private Boolean checkIfCanRegister() {
        // 判斷0~31這個區間段的機器IP是否被佔滿
        try (Jedis jedis = jedisPool.getResource()) {
            Boolean flag = true;
            for (int i = 0; i < 32; i++) {
                flag = jedis.exists(APP_NAME + dataCenterId + i);
                // 如果不存在。設定機器Id為這個不存在的數字
                if (!flag) {
                    machineId = i;
                    break;
                }
            }
            return !flag;
        }
    }

    /**
     * 1.更新超時時間
     * 注意,更新前檢查是否存在機器ip佔用情況
     */
    private void updateExpTimeThread() {
        // 開啟一個執行緒執行定時任務:
        // 每23小時更新一次超時時間
        new Timer(localIp).schedule(new TimerTask() {
            @Override
            public void run() {
                // 檢查快取中的ip與本機ip是否一致, 一致則更新時間,不一致則重新獲取一個機器id
                Boolean b = checkIsLocalIp(String.valueOf(machineId));
                if (b) {
                    log.info("IP一致,更新超時時間 ip:{},machineId:{}, time:{}", localIp, machineId, new Date());
                    try (Jedis jedis = jedisPool.getResource()) {
                        jedis.expire(APP_NAME + dataCenterId + machineId, 60 * 60 * 24 );
                    }
                } else {
                    // IP衝突
                    log.info("重新生成機器ID ip:{},machineId:{}, time:{}", localIp, machineId, new Date());
                    // 重新生成機器ID,並且更改雪花中的機器ID
                    getRandomMachineId();
                    // 重新生成並註冊機器id
                    createMachineId();
                    // 更改雪花中的機器ID
                    SnowFlake.setWorkerId(machineId);
                    // 結束當前任務
                    log.info("Timer->thread->name:{}", Thread.currentThread().getName());
                    this.cancel();
                }
            }
        }, 10 * 1000, 1000 * 60 * 60 * 23);
    }

    /**
     * 獲取32-63隨機數
     */
    public void getRandomMachineId() {
        machineId = (int) (Math.random() * 31) + 31;
    }


    /**
     * 檢查Redis中對應Key的Value是否是本機IP
     *
     * @param mechineId
     * @return
     */
    private Boolean checkIsLocalIp(String mechineId) {
        try (Jedis jedis = jedisPool.getResource()) {
            String ip = jedis.get(APP_NAME + dataCenterId + mechineId);
            log.info("checkIsLocalIp->ip:{}", ip);
            return localIp.equals(ip);
        }
    }

    /**
     * 1.註冊機器
     * 2.設定超時時間
     *
     * @param machineId 取值為0~31
     * @return
     */
    private Boolean registerMachine(Integer machineId, String localIp) throws Exception {
        // try with resources 寫法,出異常會釋放括號內的資源 Java7特性
        try (Jedis jedis = jedisPool.getResource()) {
            // key 業務號 + 資料中心ID + 機器ID value 機器IP
            Long result = jedis.setnx(APP_NAME + dataCenterId + machineId, localIp);
            if(result == 1){
                // 過期時間 1 天
                jedis.expire(APP_NAME + dataCenterId + machineId, 60 * 60 * 24);
                return true;
            } else {
                // 如果Key存在,判斷Value和當前IP是否一致,一致則返回True
                String value = jedis.get(APP_NAME + dataCenterId + machineId);
                if(localIp.equals(value)){
                    // IP一致,註冊機器ID成功
                    jedis.expire(APP_NAME + dataCenterId + machineId, 60 * 60 * 24);
                    return true;
                }
                return false;
            }
        }
    }
}

雪花ID:

import org.springframework.context.annotation.Configuration;

/**
 * 功能:分散式ID生成工具類
 *
 */
@Configuration
public class SnowFlake {
    /**
     * 開始時間截 (2019-09-08) 服務一旦執行過之後不能修改。會導致ID生成重複
     */
    private final long twepoch = 1567872000000L;

    /**
     * 機器Id所佔的位數 0 - 64
     */
    private final long workerIdBits = 6L;

    /**
     * 工作組Id所佔的位數 0 - 16
     */
    private final long dataCenterIdBits = 4L;

    /**
     * 支援的最大機器id,結果是63 (這個移位演算法可以很快的計算出幾位二進位制數所能表示的最大十進位制數)
     */
    private final long maxWorkerId = -1L ^ (-1L << workerIdBits);

    /**
     * 支援的最大資料標識id,結果是15
     */
    private final long maxDatacenterId = -1L ^ (-1L << dataCenterIdBits);

    /**
     * 序列在id中佔的位數
     */
    private final long sequenceBits = 12L;

    /**
     * 機器ID向左移12位
     */
    private final long workerIdShift = sequenceBits;

    /**
     * 資料標識id向左移17位(12+5)
     */
    private final long datacenterIdShift = sequenceBits + workerIdBits;

    /**
     * 時間截向左移22位(5+5+12)
     */
    private final long timestampLeftShift = sequenceBits + workerIdBits + dataCenterIdBits;

    /**
     * 生成序列的掩碼,這裡為4095 (0b111111111111=0xfff=4095)
     */
    private final long sequenceMask = -1L ^ (-1L << sequenceBits);

    /**
     * 工作機器ID(0~63)
     */
    private static long workerId;

    /**
     * 資料中心ID(0~16)
     */
    private long datacenterId;

    /**
     * 毫秒內序列(0~4095)
     */
    private long sequence = 0L;

    /**
     * 上次生成ID的時間截
     */
    private long lastTimestamp = -1L;

    //==============================Constructors=====================================

    /**
     * 建構函式
     *
     * @param workerId     工作ID (0~63)
     * @param datacenterId 資料中心ID (0~15)
     */
    public SnowFlake(long workerId, long datacenterId) {
        if (workerId > maxWorkerId || workerId < 0) {
            throw new IllegalArgumentException(String.format("機器ID必須小於 %d 且大於 0", maxWorkerId));
        }
        if (datacenterId > maxDatacenterId || datacenterId < 0) {
            throw new IllegalArgumentException(String.format("工作組ID必須小於 %d 且大於 0", maxDatacenterId));
        }
        this.workerId = workerId;
        this.datacenterId = datacenterId;
    }

    /**
     * 建構函式
     *
     */
    public SnowFlake() {
        this.workerId = 0;
        this.datacenterId = 0;
    }

    /**
     * 獲得下一個ID (該方法是執行緒安全的)
     *
     * @return SnowFlakeId
     */
    public synchronized long nextId() {
        long timestamp = timeGen();

        //如果當前時間小於上一次ID生成的時間戳,說明系統時鐘回退過這個時候應當丟擲異常
        if (timestamp < lastTimestamp) {
            throw new RuntimeException(
                    String.format("Clock moved backwards.  Refusing to generate id for %d milliseconds", lastTimestamp - timestamp));
        }

        // 如果是同一時間生成的,則進行毫秒內序列
        if (lastTimestamp == timestamp) {
            sequence = (sequence + 1) & sequenceMask;
            // 毫秒內序列溢位
            if (sequence == 0) {
                // 阻塞到下一個毫秒,獲得新的時間戳
                timestamp = tilNextMillis(lastTimestamp);
            }
        }
        //時間戳改變,毫秒內序列重置
        else {
            sequence = 0L;
        }

        // 上次生成ID的時間截
        lastTimestamp = timestamp;

        // 移位並通過或運算拼到一起組成64位的ID
        return ((timestamp - twepoch) << timestampLeftShift) //
                | (datacenterId << datacenterIdShift) //
                | (workerId << workerIdShift) //
                | sequence;
    }

    /**
     * 阻塞到下一個毫秒,直到獲得新的時間戳
     *
     * @param lastTimestamp 上次生成ID的時間截
     * @return 當前時間戳
     */
    protected long tilNextMillis(long lastTimestamp) {
        long timestamp = timeGen();
        while (timestamp <= lastTimestamp) {
            timestamp = timeGen();
        }
        return timestamp;
    }

    /**
     * 返回以毫秒為單位的當前時間
     *
     * @return 當前時間(毫秒)
     */
    protected long timeGen() {
        return System.currentTimeMillis();
    }

    public long getWorkerId() {
        return workerId;
    }

    public static void setWorkerId(long workerId) {
        SnowFlake.workerId = workerId;
    }

    public long getDatacenterId() {
        return datacenterId;
    }

    public void setDatacenterId(long datacenterId) {
        this.datacenterId = datacenterId;
    }
}

Redis 配置

public class RedisConfig {

    @Value("${spring.redis.host}")
    private String host;

    @Value("${spring.redis.port:6379}")
    private Integer port;

    @Value("${spring.redis.password:-1}")
    private String password;

    @Bean
    public JedisPool jedisPool() {
        // 1.設定連線池的配置物件
        JedisPoolConfig config = new JedisPoolConfig();
        // 設定池中最大連線數
        config.setMaxTotal(50);
        // 設定空閒時池中保有的最大連線數
        config.setMaxIdle(10);
        config.setMaxWaitMillis(3000L);
        config.setTestOnBorrow(true);
        log.info(password);
        // 2.設定連線池物件
        if("-1".equals(password)){
            log.info("Redis不通過密碼連線");
            return new JedisPool(config, host, port,0);
        } else {
            log.info("Redis通過密碼連線" + password);
            return new JedisPool(config, host, port,0, password);
        }
    }
}

使用方法

  1. 專案中引入 Redis 、 Jedis 依賴
  2. 複製上面兩個類到專案until包下
  3. application.yml 配置服務名稱,機器序號,Redis賬號,密碼
  4. 配置Jedis,使得專案啟動時池中有Redis連線物件
  5. 啟動專案
  6. 在需要生成ID的類中注入
    @Autowired
    private SnowFlake snowFlake;
    // 生產ID
    snowFlake.nextId(); 方法生產ID