雪花算法中機器id保證全局唯一
阿新 • • 發佈:2019-03-09
elong mda druid ping oplog 更新 主鍵 star span
關於分布式id的生成系統, 美團技術團隊之前已經有寫過一篇相關的文章, 詳見 Leaf——美團點評分布式ID生成系統
通常在生產中會用Twitter開源的雪花算法來生成分布式主鍵 雪花算法中的核心就是機器id和數據中心id, 通常來說數據中心id可以在配置文件中配置, 通常一個服務集群可以共用一個配置文件, 而機器id如果也放在配置文件中維護的話, 每個應用就需要一個獨立的配置, 難免也會出現機器id重復的問題
解決方案: 1. 通過啟動參數去指定機器id, 但是這種方式也會有出錯的可能性 2. 每個應用啟動的時候註冊到redis或者zookeeper, 由redis或zookeeper來分配機器id
接下來主要介紹基於redis的實現方式, 一種是註冊的時候設置過期時間, 配置定時器定時去檢查機器id是否過期需要重新分配; 另一種是不設置過期時間, 只依靠在spring容器銷毀的時候去刪除記錄(但是這種方式容易刪除失敗)
實現方式一
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.10.RELEASE</version> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> <exclusions> <exclusion> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-logging</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-logging</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-aop</artifactId> <exclusions> <exclusion> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-logging</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> <exclusions> <exclusion> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-logging</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <!-- 日誌包...開始 --> <!-- log配置:Log4j2 + Slf4j --> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-api</artifactId> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> </dependency> <dependency> <!-- 橋接:告訴Slf4j使用Log4j2 --> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-slf4j-impl</artifactId> </dependency> <dependency> <!-- 橋接:告訴commons logging使用Log4j2 --> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-jcl</artifactId> <version>2.2</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </dependency> <!-- 日誌包...結束 --> </dependencies>
redis的配置
/** * redis的配置 * * @author wang.js on 2019/3/8. * @version 1.0 */ @Configuration public class RedisConfig { @Value("${spring.redis.host}") private String host; @Value("${spring.redis.port:6379}") private Integer port; @Bean public JedisPool jedisPool() { //1.設置連接池的配置對象 JedisPoolConfig config = new JedisPoolConfig(); //設置池中最大連接數 config.setMaxTotal(50); //設置空閑時池中保有的最大連接數 config.setMaxIdle(10); config.setMaxWaitMillis(3000L); config.setTestOnBorrow(true); //2.設置連接池對象 return new JedisPool(config,host,port); } }
snowflake算法中機器id的獲取
/**
* snowflake算法中機器id的獲取
*
* @author wang.js on 2019/3/8.
* @version 1.0
*/
@Configuration
public class MachineIdConfig {
@Resource
private JedisPool jedisPool;
@Value("${snowflake.datacenter}")
private Integer dataCenterId;
@Value("${snowflake.bizType}")
private String OPLOG_MACHINE_ID_kEY;
/**
* 機器id
*/
public static Integer machineId;
/**
* 本地ip地址
*/
private static String localIp;
private static TimeUnit timeUnit = TimeUnit.DAYS;
private static final Logger LOGGER = LoggerFactory.getLogger(MachineIdConfig.class);
/**
* 獲取ip地址
*
* @return
* @throws UnknownHostException
*/
private String getIPAddress() throws UnknownHostException {
InetAddress address = InetAddress.getLocalHost();
return address.getHostAddress();
}
/**
* hash機器IP初始化一個機器ID
*/
@Bean
public SnowFlakeGenerator initMachineId() throws Exception {
localIp = getIPAddress();
Long ip_ = Long.parseLong(localIp.replaceAll("\\.", ""));
//這裏取128,為後續機器Ip調整做準備。
machineId = ip_.hashCode() % 32;
//創建一個機器ID
createMachineId();
LOGGER.info("初始化 machine_id :{}", machineId);
return new SnowFlakeGenerator(machineId, dataCenterId);
}
/**
* 容器銷毀前清除註冊記錄
*/
@PreDestroy
public void destroyMachineId() {
try (Jedis jedis = jedisPool.getResource()) {
jedis.del(OPLOG_MACHINE_ID_kEY + dataCenterId + machineId);
}
}
/**
* 主方法:獲取一個機器id
*
* @return
*/
public Integer createMachineId() {
try {
//向redis註冊,並設置超時時間
Boolean aBoolean = registerMachine(machineId, localIp);
//註冊成功
if (aBoolean) {
//啟動一個線程更新超時時間
updateExpTimeThread();
//返回機器Id
return machineId;
}
//檢查是否被註冊滿了.不能註冊,就直接返回
if (!checkIfCanRegister()) {
//註冊滿了,加一個報警
return machineId;
}
LOGGER.info("createMachineId->ip:{},machineId:{}, time:{}", localIp, machineId, new Date());
//遞歸調用
createMachineId();
} catch (Exception e) {
getRandomMachineId();
return machineId;
}
getRandomMachineId();
return machineId;
}
/**
* 檢查是否被註冊滿了
*
* @return
*/
private Boolean checkIfCanRegister() {
Boolean flag = true;
//判斷0~127這個區間段的機器IP是否被占滿
try (Jedis jedis = jedisPool.getResource()) {
for (int i = 0; i <= 127; i++) {
flag = jedis.exists(OPLOG_MACHINE_ID_kEY + dataCenterId + i);
//如果不存在。說明還可以繼續註冊。直接返回i
if (!flag) {
machineId = i;
break;
}
}
}
return !flag;
}
/**
* 1.更新超時時間
* 註意,更新前檢查是否存在機器ip占用情況
*/
private void updateExpTimeThread() {
//開啟一個線程執行定時任務:
//1.每23小時更新一次超時時間
new Timer(localIp).schedule(new TimerTask() {
@Override
public void run() {
//檢查緩存中的ip與本機ip是否一致, 一致則更新時間,不一致則重新獲取一個機器id
Boolean b = checkIsLocalIp(String.valueOf(machineId));
if (b) {
LOGGER.info("更新超時時間 ip:{},machineId:{}, time:{}", localIp, machineId, new Date());
try (Jedis jedis = jedisPool.getResource()) {
jedis.expire(OPLOG_MACHINE_ID_kEY + dataCenterId + machineId, 60 * 60 * 24 * 1000);
}
} else {
LOGGER.info("重新生成機器ID ip:{},machineId:{}, time:{}", localIp, machineId, new Date());
//重新生成機器ID,並且更改雪花中的機器ID
getRandomMachineId();
//重新生成並註冊機器id
createMachineId();
//更改雪花中的機器ID
SnowFlakeGenerator.setWorkerId(machineId);
// 結束當前任務
LOGGER.info("Timer->thread->name:{}", Thread.currentThread().getName());
this.cancel();
}
}
}, 10 * 1000, 1000 * 60 * 60 * 23);
}
/**
* 獲取1~127隨機數
*/
public void getRandomMachineId() {
machineId = (int) (Math.random() * 127);
}
/**
* 機器ID順序獲取
*/
public void incMachineId() {
if (machineId >= 127) {
machineId = 0;
} else {
machineId += 1;
}
}
/**
* @param mechineId
* @return
*/
private Boolean checkIsLocalIp(String mechineId) {
try (Jedis jedis = jedisPool.getResource()) {
String ip = jedis.get(OPLOG_MACHINE_ID_kEY + dataCenterId + mechineId);
LOGGER.info("checkIsLocalIp->ip:{}", ip);
return localIp.equals(ip);
}
}
/**
* 1.註冊機器
* 2.設置超時時間
*
* @param machineId 取值為0~127
* @return
*/
private Boolean registerMachine(Integer machineId, String localIp) throws Exception {
try (Jedis jedis = jedisPool.getResource()) {
jedis.set(OPLOG_MACHINE_ID_kEY + dataCenterId + machineId, localIp);
jedis.expire(OPLOG_MACHINE_ID_kEY + dataCenterId + machineId, 60 * 60 * 24 * 1000);
return true;
}
}
}
雪花算法(雪花算法百度上很多, 自己可以隨便找一個)
/**
* 雪花算法
*
* @author wang.js on 2019/3/8.
* @version 1.0
*/
public class SnowFlakeGenerator {
private final long twepoch = 1288834974657L;
private final long workerIdBits = 5L;
private final long datacenterIdBits = 5L;
private final long maxWorkerId = -1L ^ (-1L << workerIdBits);
private final long maxDatacenterId = -1L ^ (-1L << datacenterIdBits);
private final long sequenceBits = 12L;
private final long workerIdShift = sequenceBits;
private final long datacenterIdShift = sequenceBits + workerIdBits;
private final long timestampLeftShift = sequenceBits + workerIdBits + datacenterIdBits;
private final long sequenceMask = -1L ^ (-1L << sequenceBits);
private static long workerId;
private long datacenterId;
private long sequence = 0L;
private long lastTimestamp = -1L;
public SnowFlakeGenerator(long actualWorkId, long datacenterId) {
if (actualWorkId > maxWorkerId || actualWorkId < 0) {
throw new IllegalArgumentException(String.format("worker Id can‘t be greater than %d or less than 0", maxWorkerId));
}
if (datacenterId > maxDatacenterId || datacenterId < 0) {
throw new IllegalArgumentException(String.format("datacenter Id can‘t be greater than %d or less than 0", maxDatacenterId));
}
workerId = actualWorkId;
this.datacenterId = datacenterId;
}
public static void setWorkerId(long workerId) {
SnowFlakeGenerator.workerId = workerId;
}
public synchronized long nextId() {
long timestamp = timeGen();
if (timestamp < lastTimestamp) {
throw new RuntimeException(String.format("Clock moved backwards. Refusing to generate id for %d milliseconds", lastTimestamp - timestamp));
}
if (lastTimestamp == timestamp) {
sequence = (sequence + 1) & sequenceMask;
if (sequence == 0) {
timestamp = tilNextMillis(lastTimestamp);
}
} else {
sequence = 0L;
}
lastTimestamp = timestamp;
return ((timestamp - twepoch) << timestampLeftShift) | (datacenterId << datacenterIdShift) | (workerId << workerIdShift) | sequence;
}
protected long tilNextMillis(long lastTimestamp) {
long timestamp = timeGen();
while (timestamp <= lastTimestamp) {
timestamp = timeGen();
}
return timestamp;
}
protected long timeGen() {
return System.currentTimeMillis();
}
}
測試的controller
/**
* 雪花算法
*
* @author wang.js on 2019/3/8.
* @version 1.0
*/
@RequestMapping("/snowflake")
@RestController
public class SnowflakeController {
@Resource
private SnowFlakeGenerator snowFlakeGenerator;
/**
* 獲取分布式主鍵
*
* @return
*/
@GetMapping("/get")
public long getDistributeId() {
return snowFlakeGenerator.nextId();
}
}
配置文件
server:
port: 12892
spring:
redis:
database: 0
host: mini7
lettuce:
pool:
max-active: 8
max-idle: 8
max-wait: -1
min-idle: 0
port: 6379
timeout: 10000
snowflake:
datacenter: 1 # 數據中心的id
bizType: order_id_ # 業務類型
實現方式二
機器id註冊到redis的時候, 不設置過期時間 同時采用sharding-jdbc的分布式主鍵生成組件
maven依賴
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.10.RELEASE</version>
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
<sharding-sphere.version>3.0.0.M4</sharding-sphere.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!--sharding-jdbc依賴開始-->
<!-- for spring boot -->
<dependency>
<groupId>io.shardingsphere</groupId>
<artifactId>sharding-jdbc-spring-boot-starter</artifactId>
<version>${sharding-sphere.version}</version>
</dependency>
<!-- for spring namespace -->
<dependency>
<groupId>io.shardingsphere</groupId>
<artifactId>sharding-jdbc-spring-namespace</artifactId>
<version>${sharding-sphere.version}</version>
</dependency>
<!--sharding-jdbc依賴結束-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.41</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.1.0</version>
</dependency>
<!-- 日誌包...開始 -->
<!-- log配置:Log4j2 + Slf4j -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
</dependency>
<dependency> <!-- 橋接:告訴Slf4j使用Log4j2 -->
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
</dependency>
<dependency> <!-- 橋接:告訴commons logging使用Log4j2 -->
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-jcl</artifactId>
<version>2.2</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<!-- 日誌包...結束 -->
</dependencies>
配置文件
server:
port: 12893
# sharding-jdbc分庫分表的配置
sharding:
jdbc:
datasource:
ds0:
type: com.alibaba.druid.pool.DruidDataSource
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://localhost:3306/ds0
username: root
password: 123456
names: ds0
spring:
redis:
database: 0
host: mini7
lettuce:
pool:
max-active: 8
max-idle: 8
max-wait: -1
min-idle: 0
port: 6379
timeout: 10000
snowflake:
datacenter: 1 # 數據中心的id
bizType: sharding_jdbc_id_ # 業務類型
redis的配置
/**
* redis的配置
*
* @author wang.js on 2019/3/8.
* @version 1.0
*/
@Configuration
public class RedisConfig {
@Value("${spring.redis.host}")
private String host;
@Value("${spring.redis.port:6379}")
private Integer port;
@Bean
public JedisPool jedisPool() {
//1.設置連接池的配置對象
JedisPoolConfig config = new JedisPoolConfig();
//設置池中最大連接數
config.setMaxTotal(50);
//設置空閑時池中保有的最大連接數
config.setMaxIdle(10);
config.setMaxWaitMillis(3000L);
config.setTestOnBorrow(true);
//2.設置連接池對象
return new JedisPool(config,host,port);
}
}
機器id的配置
/**
* 保證workerId的全局唯一性
*
* @author wang.js on 2019/3/8.
* @version 1.0
*/
@Component
public class WorkerIdConfig {
@Resource
private JedisPool jedisPool;
@Value("${snowflake.datacenter}")
private Integer dataCenterId;
@Value("${snowflake.bizType}")
private String bizType;
/**
* 機器id
*/
private int workerId;
private static final Logger LOGGER = LoggerFactory.getLogger(WorkerIdConfig.class);
public int getWorkerId() throws UnknownHostException {
String ipAddress = getIPAddress();
Long ip = Long.parseLong(ipAddress.replaceAll("\\.", ""));
//這裏取128,為後續機器Ip調整做準備。
workerId = ip.hashCode() % 1024;
try (Jedis jedis = jedisPool.getResource()) {
Long setnx = jedis.setnx(bizType + dataCenterId + workerId, ipAddress);
if (setnx > 0) {
return workerId;
} else {
// 判斷是否是同一ip
String cacheIp = jedis.get(bizType + dataCenterId + workerId);
if (ipAddress.equalsIgnoreCase(cacheIp)) {
return workerId;
}
}
throw new RuntimeException("機器id:" + workerId + "已經存在, 請先清理緩存");
}
}
@PreDestroy
public void delWorkerId() {
LOGGER.info("開始銷毀機器id:" + workerId);
try (Jedis jedis = jedisPool.getResource()) {
Long del = jedis.del(bizType + dataCenterId + workerId);
if (del == 0) {
throw new RuntimeException("機器id:" + workerId + "刪除失敗");
}
}
}
/**
* 獲取ip地址
*
* @return
* @throws UnknownHostException
*/
private String getIPAddress() throws UnknownHostException {
InetAddress address = InetAddress.getLocalHost();
return address.getHostAddress();
}
}
sharding-jdbc分布式主鍵生成的配置
/**
* sharding-jdbc分布式主鍵生成的配置
*
* @author wang.js on 2019/3/8.
* @version 1.0
*/
@Configuration
public class ShardingIdConfig {
@Resource
private WorkerIdConfig workerIdConfig;
@Bean
public DefaultKeyGenerator defaultKeyGenerator() throws UnknownHostException {
DefaultKeyGenerator generator = new DefaultKeyGenerator();
// 最大值小於1024
DefaultKeyGenerator.setWorkerId(workerIdConfig.getWorkerId());
return generator;
}
}
測試的controller
/**
* 生成分布式主鍵
*
* @author wang.js on 2019/3/8.
* @version 1.0
*/
@RestController
@RequestMapping("/id")
public class GenIdController {
@Resource
private DefaultKeyGenerator generator;
@GetMapping("/get")
public long get() {
return generator.generateKey().longValue();
}
}
sharding-jdbc的DefaultKeyGenerator中的源碼中可以看到最大的workerId是1024L
public static void setWorkerId(long workerId) {
Preconditions.checkArgument(workerId >= 0L && workerId < 1024L);
workerId = workerId;
}
雪花算法中機器id保證全局唯一