分布式ID生成方案

分布式系统中我们对ID生成器要求又有哪些呢?

  • 全局唯一性:不能出现重复的ID号,既然是唯一标识,这是最基本的要求。

  • 递增:比较低要求的条件为趋势递增,即保证下一个ID一定大于上一个ID,而比较苛刻的要求是连续递增,如1,2,3等等。

  • 高可用高性能:ID生成事关重大,一旦挂掉系统崩溃;高性能是指必须要在压测下表现良好,如果达不到要求则在高并发环境下依然会导致系统瘫痪。

UUID/GUID(通用唯一识别码)

UUID是通用唯一识别码(Universally Unique Identifier)的缩写,开放软件基金会(OSF)。
由以下几部分的组合:

1、当前日期和时间
2、时钟序列
3、全局唯一的IEEE机器识别号(如果有网卡,从网卡获得,没有网卡以其他方式获得)

UUID是由128位二进制组成,一般转换成十六进制,然后用String表示。

示例UUID,长度为36的字符串:4cdbc040-657a-4847-b266-7e31d9e2c3d9

优点

1、通过本地生成,没有经过网络I/O,性能较快

2、能够保证独立性,程序可以在不同的数据库间迁移,效果不受影响。

3、保证生成的ID不仅是表独立的,而且是库独立的,这点在你想切分数据库的时候尤为重要。

缺点

1、UUID太长,通常以36长度的字符串表示,空间占用大。

2、UUID无业务含义:很多需要ID能标识业务含义的地方不使用。

3、不满足递增要求,对MySQL索引不利:如果作为数据库主键,在InnoDB引擎下,UUID的无序性可能会引起数据位置频繁变动,严重影响性能。

适用场景

UUID的适用场景可以为不担心过多的空间占用,以及不需要生成有递增趋势的数字。在Log4j里面他在UuidPatternConverter中加入了UUID来标识每一条日志。

数据库主键自增

用数据库生成ID是最常见的方案。能够确保ID全数据库唯一。

MySQL使用AUTO_INCREMENT、Oracle使用Sequence序列

集群环境下,不同的库,设置不同的初始值,每次自增加100

MySQL下修改起点和步长的方式

1
2
3
4
5
6

set @@auto_increment_offset=1; -- 设置起点

set @@auto_increment_increment=100; -- 设置步长为100

show variables like 'auto_inc%'; -- 查看参数

优点

简单方便,有序递增,方便排序和分页

缺点

  • 分库分表会带来问题,需要进行改造。

  • 并发性能不高,受限于数据库的性能。

  • 数据库宕机服务不可用。

适用场景

根据上面可以总结出来,当数据量不多,并发性能不高的时候这个很适合,比如一些to B的业务,商家注册这些,商家注册和用户注册不是一个数量级的,所以可以数据库主键递增。如果对顺序递增强依赖,那么也可以使用数据库主键自增。

优化方案

针对主库单点,如果有多个Master库,则每个Master库设置的起始数字不一样,步长一样,可以是Master的个数。比如:Master1 生成的是 1,4,7,10,Master2生成的是2,5,8,11 Master3生成的是 3,6,9,12。这样就可以有效生成集群中的唯一ID,也可以大大降低ID生成数据库操作的负载。

基于Redis自增

Redis的 incr(key) API 用于将key的值进行递增,并返回增长数值。如果key不存在,则创建并赋值为0。

利用Redis的特性:单线程原子操作、自增计数API、数据有效期机制 EX。
示例:

1、业务编码 + 地区 + 自增数值。 (9 020 00000000001)

优点

1、性能比数据库好,能满足有序递增。

2、拓展性强,可以方便的结合业务进行处理;

3、利用Redis操作原子性的特性,保证在并发的时候不会重复;

缺点

1、引入Redis就意味着引入其他第三方的依赖;

2、增加一次网络开销;

3、需要对Redis服务实现高可用;

4、 由于redis是内存的KV数据库,即使有AOF和RDB,但是依然会存在数据丢失,有可能会造成ID重复。

适用场景

由于其性能比数据库好,但是有可能会出现ID重复和不稳定,这一块如果可以接受那么就可以使用。也适用于到了某个时间,比如每天都刷新ID,那么这个ID就需要重置,通过(Incr Today),每天都会从0开始加。

ZooKeeper生成ID

实际业务中,除了分布式ID全局唯一之外,还有是否趋势/连续递增的要求。根据具体业务需求的不同,有两种可选方案。

一是只保证全局唯一,不保证连续递增。二是既保证全局唯一,又保证连续递增。

基于ZooKeeper和本地缓存的方案

基于zookeeper分布式ID实现方案有很多种,本方案只使用ZooKeeper作为分段节点协调工具。每台服务器首先从zookeeper缓存一段,如1-1000的id。

此时zk上保存最大值1000,每次获取的时候都会进行判断,如果id小于本地最大值,即id<=1000,则更新本地的当前值,如果id大于本地当前值,比如说是1001,则会将从zk再获取下一个id数据段并在本地缓存。获取数据段的时候需要更新zk节点数据,更新的时候使用curator的分布式锁来实现。

由于id是从本机获取,因此本方案的优点是性能非常好。缺点是如果多主机负载均衡,则会出现不连续的id,当然将递增区段设置为1也能保证连续的id,但是效率会受到很大影响。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.UnsupportedEncodingException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* 根据开源项目mycat实现基于zookeeper的递增序列号
* <p>
* 只要配置好ZK地址和表名的如下属性
* MINID 某线程当前区间内最小值
* MAXID 某线程当前区间内最大值
* CURID 某线程当前区间内当前值
*
* @author wangwanbin
* @version 1.0
* @time 2017/9/1
*/
public class ZKCachedSequenceHandler extends SequenceHandler {
protected static final Logger LOGGER = LoggerFactory.getLogger(ZKCachedSequenceHandler.class);
private static final String KEY_MIN_NAME = ".MINID";// 1
private static final String KEY_MAX_NAME = ".MAXID";// 10000
private static final String KEY_CUR_NAME = ".CURID";// 888
private final static long PERIOD = 1000;//每次缓存的ID段数量
private static ZKCachedSequenceHandler instance = new ZKCachedSequenceHandler();

/**
* 私有化构造方法,单例模式
*/
private ZKCachedSequenceHandler() {
}

/**
* 获取sequence工具对象的唯一方法
*
* @return
*/
public static ZKCachedSequenceHandler getInstance() {
return instance;
}

private Map<String, Map<String, String>> tableParaValMap = null;

private CuratorFramework client;
private InterProcessSemaphoreMutex interProcessSemaphore = null;

public void loadZK() {
try {
this.client = CuratorFrameworkFactory.newClient(zkAddress, new ExponentialBackoffRetry(10003));
this.client.start();
} catch (Exception e) {
LOGGER.error("Error caught while initializing ZK:" + e.getCause());
}
}

public Map<String, String> getParaValMap(String prefixName) {
if (tableParaValMap == null) {
try {
loadZK();
fetchNextPeriod(prefixName);
} catch (Exception e) {
LOGGER.error("Error caught while loding configuration within current thread:" + e.getCause());
}
}
Map<String, String> paraValMap = tableParaValMap.get(prefixName);
return paraValMap;
}

public Boolean fetchNextPeriod(String prefixName) {
try {
Stat stat = this.client.checkExists().forPath(PATH + "/" + prefixName + SEQ);

if (stat == null || (stat.getDataLength() == 0)) {
try {
client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
.forPath(PATH + "/" + prefixName + SEQ, String.valueOf(0).getBytes());
} catch (Exception e) {
LOGGER.debug("Node exists! Maybe other instance is initializing!");
}
}
if (interProcessSemaphore == null) {
interProcessSemaphore = new InterProcessSemaphoreMutex(client, PATH + "/" + prefixName + SEQ);
}
interProcessSemaphore.acquire();
if (tableParaValMap == null) {
tableParaValMap = new ConcurrentHashMap<>();
}
Map<String, String> paraValMap = tableParaValMap.get(prefixName);
if (paraValMap == null) {
paraValMap = new ConcurrentHashMap<>();
tableParaValMap.put(prefixName, paraValMap);
}
long now = Long.parseLong(new String(client.getData().forPath(PATH + "/" + prefixName + SEQ)));
client.setData().forPath(PATH + "/" + prefixName + SEQ, ((now + PERIOD) + "").getBytes());
if (now == 1) {
paraValMap.put(prefixName + KEY_MAX_NAME, PERIOD + "");
paraValMap.put(prefixName + KEY_MIN_NAME, "1");
paraValMap.put(prefixName + KEY_CUR_NAME, "0");
} else {
paraValMap.put(prefixName + KEY_MAX_NAME, (now + PERIOD) + "");
paraValMap.put(prefixName + KEY_MIN_NAME, (now) + "");
paraValMap.put(prefixName + KEY_CUR_NAME, (now) + "");
}
} catch (Exception e) {
LOGGER.error("Error caught while updating period from ZK:" + e.getCause());
} finally {
try {
interProcessSemaphore.release();
} catch (Exception e) {
LOGGER.error("Error caught while realeasing distributed lock" + e.getCause());
}
}
return true;
}

public Boolean updateCURIDVal(String prefixName, Long val) {
Map<String, String> paraValMap = tableParaValMap.get(prefixName);
if (paraValMap == null) {
throw new IllegalStateException("ZKCachedSequenceHandler should be loaded first!");
}
paraValMap.put(prefixName + KEY_CUR_NAME, val + "");
return true;
}

/**
* 获取自增ID
*
* @param sequenceEnum
* @return
*/
@Override
public synchronized long nextId(SequenceEnum sequenceEnum) {
String prefixName = sequenceEnum.getCode();
Map<String, String> paraMap = this.getParaValMap(prefixName);
if (null == paraMap) {
throw new RuntimeException("fetch Param Values error.");
}
Long nextId = Long.parseLong(paraMap.get(prefixName + KEY_CUR_NAME)) + 1;
Long maxId = Long.parseLong(paraMap.get(prefixName + KEY_MAX_NAME));
if (nextId > maxId) {
fetchNextPeriod(prefixName);
return nextId(sequenceEnum);
}
updateCURIDVal(prefixName, nextId);
return nextId.longValue();
}

public static void main(String[] args) throws UnsupportedEncodingException {
long startTime = System.currentTimeMillis(); //获取开始时间
final ZKCachedSequenceHandler sequenceHandler = getInstance();
sequenceHandler.loadZK();
new Thread() {
public void run() {
long startTime2 = System.currentTimeMillis(); //获取开始时间
for (int i = 0; i < 5000; i++) {
System.out.println("线程1 " + sequenceHandler.nextId(SequenceEnum.ACCOUNT));
}
long endTime2 = System.currentTimeMillis(); //获取结束时间
System.out.println("程序运行时间1: " + (endTime2 - startTime2) + "ms");
}
}.start();
for (int i = 0; i < 5000; i++) {
System.out.println("线程2 " + sequenceHandler.nextId(SequenceEnum.ACCOUNT));
}
long endTime = System.currentTimeMillis(); //获取结束时间
System.out.println("程序运行时间2: " + (endTime - startTime) + "ms");
}
}

利用zk的永久自增节点策略实现持续递增ID

使用zk的永久sequence策略创建节点,并获取返回值,然后删除前一个节点,这样既防止zk服务器存在过多的节点,又提高了效率;节点删除采用线程池来统一处理,提高响应速度。

优点:能创建连续递增的ID。
关键实现代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
import com.zb.p2p.enums.SequenceEnum;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.PooledObjectFactory;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayDeque;
import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* 基于zk的永久型自增节点PERSISTENT_SEQUENTIAL实现
* 每次生成节点后会使用线程池执行删除节点任务
* Created by wangwanbin on 2017/9/5.
*/
public class ZKIncreaseSequenceHandler extends SequenceHandler implements PooledObjectFactory<CuratorFramework> {
protected static final Logger LOGGER = LoggerFactory.getLogger(ZKCachedSequenceHandler.class);
private static ZKIncreaseSequenceHandler instance = new ZKIncreaseSequenceHandler();
private static ExecutorService fixedThreadPool = Executors.newFixedThreadPool(1);
private GenericObjectPool genericObjectPool;
private Queue<Long> preNodes = new ConcurrentLinkedQueue<>();
private static String ZK_ADDRESS = ""; //192.168.0.65
private static String PATH = "";// /sequence/p2p
private static String SEQ = "";//seq;

/**
* 私有化构造方法,单例模式
*/
private ZKIncreaseSequenceHandler() {
GenericObjectPoolConfig config = new GenericObjectPoolConfig();
config.setMaxTotal(4);
genericObjectPool = new GenericObjectPool(this, config);
}

/**
* 获取sequence工具对象的唯一方法
*
* @return
*/
public static ZKIncreaseSequenceHandler getInstance(String zkAddress, String path, String seq) {
ZK_ADDRESS = zkAddress;
PATH = path;
SEQ = seq;
return instance;
}

@Override
public long nextId(final SequenceEnum sequenceEnum) {
String result = createNode(sequenceEnum.getCode());
final String idstr = result.substring((PATH + "/" + sequenceEnum.getCode() + "/" + SEQ).length());
final long id = Long.parseLong(idstr);
preNodes.add(id);
//删除上一个节点
fixedThreadPool.execute(new Runnable() {
@Override
public void run() {
Iterator<Long> iterator = preNodes.iterator();
if (iterator.hasNext()) {
long preNode = iterator.next();
if (preNode < id) {
final String format = "%0" + idstr.length() + "d";
String preIdstr = String.format(format, preNode);
final String prePath = PATH + "/" + sequenceEnum.getCode() + "/" + SEQ + preIdstr;
CuratorFramework client = null;
try {
client = (CuratorFramework) genericObjectPool.borrowObject();
client.delete().forPath(prePath);
preNodes.remove(preNode);
} catch (Exception e) {
LOGGER.error("delete preNode error", e);
} finally {
if (client != null)
genericObjectPool.returnObject(client);
}
}
}
}
});
return id;
}

private String createNode(String prefixName) {
CuratorFramework client = null;
try {
client = (CuratorFramework) genericObjectPool.borrowObject();
String result = client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT_SEQUENTIAL)
.forPath(PATH + "/" + prefixName + "/" + SEQ, String.valueOf(0).getBytes());
return result;
} catch (Exception e) {
throw new RuntimeException("create zookeeper node error", e);
} finally {
if (client != null)
genericObjectPool.returnObject(client);
}
}

public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(1);
long startTime = System.currentTimeMillis(); //获取开始时间
final ZKIncreaseSequenceHandler sequenceHandler = ZKIncreaseSequenceHandler.getInstance("192.168.0.65""/sequence/p2p""seq");
int count = 10;
final CountDownLatch cd = new CountDownLatch(count);
for (int i = 0; i < count; i++) {
executorService.execute(new Runnable() {
public void run() {
System.out.printf("线程 %s %d \n", Thread.currentThread().getId(), sequenceHandler.nextId(SequenceEnum.ORDER));
cd.countDown();
}
});
}
try {
cd.await();
} catch (InterruptedException e) {
LOGGER.error("Interrupted thread",e);
Thread.currentThread().interrupt();
}
long endTime = System.currentTimeMillis(); //获取结束时间
System.out.println("程序运行时间: " + (endTime - startTime) + "ms");

}

@Override
public PooledObject<CuratorFramework> makeObject() throws Exception {
CuratorFramework client = CuratorFrameworkFactory.newClient(ZK_ADDRESS, new ExponentialBackoffRetry(10003));
client.start();
return new DefaultPooledObject<>(client);
}

@Override
public void destroyObject(PooledObject<CuratorFramework> p) throws Exception {

}

@Override
public boolean validateObject(PooledObject<CuratorFramework> p) {
return false;
}

@Override
public void activateObject(PooledObject<CuratorFramework> p) throws Exception {

}

@Override
public void passivateObject(PooledObject<CuratorFramework> p) throws Exception {

}
}

雪花算法-Snowflake

snowflake是Twitter开源的分布式ID生成算法,结果是一个long型的ID。其核心思想是:使用41bit作为毫秒数,10bit作为机器的ID(5个bit是数据中心,5个bit的机器ID),12bit作为毫秒内的流水号(意味着每个节点在每毫秒可以产生 4096 个 ID),最后还有一个符号位,永远是0。

  • 1bit:一般是符号位,不做处理

  • 41bit:用来记录时间戳,这里可以记录69年,如果设置好起始时间比如今年是2018年,那么可以用到2089年,到时候怎么办?要是这个系统能用69年,我相信这个系统早都重构了好多次了。

  • 10bit:10bit用来记录机器ID,总共可以记录1024台机器,一般用前5位代表数据中心,后面5位是某个数据中心的机器ID

  • 12bit:循环位,用来对同一个毫秒之内产生不同的ID,12位可以最多记录4095个,也就是在同一个机器同一毫秒最多记录4095个,多余的需要进行等待下毫秒。

上面四部分加起来是 64比特位 = 8字节 = Long 。(转换成字符串后长度最多19)。

上面只是一个将64bit划分的标准,当然也不一定这么做,可以根据不同业务的具体场景来划分,比如下面给出一个业务场景:

  • 服务目前QPS10万,预计几年之内会发展到百万。

  • 当前机器三地部署,上海,北京,深圳都有。

  • 当前机器10台左右,预计未来会增加至百台。

这个时候我们根据上面的场景可以再次合理的划分62bit,QPS几年之内会发展到百万,那么每毫秒就是千级的请求,目前10台机器那么每台机器承担百级的请求,为了保证扩展,后面的循环位可以限制到1024,也就是2^10,那么循环位10位就足够了。

机器三地部署我们可以用3bit,总共8来表示机房位置,当前的机器10台,为了保证扩展到百台那么可以用7bit 128来表示,时间位依然是41bit,那么还剩下64-10-3-7-41-1 = 2bit,还剩下2bit可以用来进行扩展。

代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
/**
* Twitter_Snowflake<br>
* SnowFlake的结构如下(每部分用-分开):<br>
* 0 - 0000000000 0000000000 0000000000 0000000000 0 - 00000 - 00000 - 000000000000 <br>
* 1位标识,由于long基本类型在Java中是带符号的,最高位是符号位,正数是0,负数是1,所以id一般是正数,最高位是0<br>
* 41位时间截(毫秒级),注意,41位时间截不是存储当前时间的时间截,而是存储时间截的差值(当前时间截 - 开始时间截)
* 得到的值),这里的的开始时间截,一般是我们的id生成器开始使用的时间,由我们程序来指定的(如下下面程序IdWorker类的startTime属性)。41位的时间截,可以使用69年,年T = (1L << 41) / (1000L * 60 * 60 * 24 * 365) = 69<br>
* 10位的数据机器位,可以部署在1024个节点,包括5位datacenterId和5位workerId<br>
* 12位序列,毫秒内的计数,12位的计数顺序号支持每个节点每毫秒(同一机器,同一时间截)产生4096个ID序号<br>
* 加起来刚好64位,为一个Long型。<br>
* SnowFlake的优点是,整体上按照时间自增排序,并且整个分布式系统内不会产生ID碰撞(由数据中心ID和机器ID作区分),并且效率较高,经测试,SnowFlake每秒能够产生26万ID左右。
*/
public class SnowflakeIdWorker {

// ==============================Fields===========================================
/** 开始时间截 (2015-01-01) */
private final long twepoch = 1420041600000L;

/** 机器id所占的位数 */
private final long workerIdBits = 5L;

/** 数据标识id所占的位数 */
private final long datacenterIdBits = 5L;

/** 支持的最大机器id,结果是31 (这个移位算法可以很快的计算出几位二进制数所能表示的最大十进制数) */
private final long maxWorkerId = -1L ^ (-1L << workerIdBits);

/** 支持的最大数据标识id,结果是31 */
private final long maxDatacenterId = -1L ^ (-1L << datacenterIdBits);

/** 序列在id中占的位数 */
private final long sequenceBits = 12L;

/** 机器ID向左移12位 */
private final long workerIdShift = sequenceBits;

/** 数据标识id向左移17位(12+5) */
private final long datacenterIdShift = sequenceBits + workerIdBits;

/** 时间截向左移22位(5+5+12) */
private final long timestampLeftShift = sequenceBits + workerIdBits + datacenterIdBits;

/** 生成序列的掩码,这里为4095 (0b111111111111=0xfff=4095) */
private final long sequenceMask = -1L ^ (-1L << sequenceBits);

/** 工作机器ID(0~31) */
private long workerId;

/** 数据中心ID(0~31) */
private long datacenterId;

/** 毫秒内序列(0~4095) */
private long sequence = 0L;

/** 上次生成ID的时间截 */
private long lastTimestamp = -1L;

//==============================Constructors=====================================
/**
* 构造函数
* @param workerId 工作ID (0~31)
* @param datacenterId 数据中心ID (0~31)
*/
public SnowflakeIdWorker(long workerId, long datacenterId) {
if (workerId > maxWorkerId || workerId < 0) {
throw new IllegalArgumentException(String.format("worker Id can't be greater than %d or less than 0", maxWorkerId));
}
if (datacenterId > maxDatacenterId || datacenterId < 0) {
throw new IllegalArgumentException(String.format("datacenter Id can't be greater than %d or less than 0", maxDatacenterId));
}
this.workerId = workerId;
this.datacenterId = datacenterId;
}

// ==============================Methods==========================================
/**
* 获得下一个ID (该方法是线程安全的)
* @return SnowflakeId
*/
public synchronized long nextId() {
long timestamp = timeGen();

//如果当前时间小于上一次ID生成的时间戳,说明系统时钟回退过这个时候应当抛出异常
if (timestamp < lastTimestamp) {
throw new RuntimeException(
String.format("Clock moved backwards. Refusing to generate id for %d milliseconds", lastTimestamp - timestamp));
}

//如果是同一时间生成的,则进行毫秒内序列
if (lastTimestamp == timestamp) {
sequence = (sequence + 1) & sequenceMask;
//毫秒内序列溢出
if (sequence == 0) {
//阻塞到下一个毫秒,获得新的时间戳
timestamp = tilNextMillis(lastTimestamp);
}
}
//时间戳改变,毫秒内序列重置
else {
sequence = 0L;
}

//上次生成ID的时间截
lastTimestamp = timestamp;

//移位并通过或运算拼到一起组成64位的ID
return ((timestamp - twepoch) << timestampLeftShift) //
| (datacenterId << datacenterIdShift) //
| (workerId << workerIdShift) //
| sequence;
}

/**
* 阻塞到下一个毫秒,直到获得新的时间戳
* @param lastTimestamp 上次生成ID的时间截
* @return 当前时间戳
*/
protected long tilNextMillis(long lastTimestamp) {
long timestamp = timeGen();
while (timestamp <= lastTimestamp) {
timestamp = timeGen();
}
return timestamp;
}

/**
* 返回以毫秒为单位的当前时间
* @return 当前时间(毫秒)
*/
protected long timeGen() {
return System.currentTimeMillis();
}

//==============================Test=============================================
/** 测试 */
public static void main(String[] args) {
SnowflakeIdWorker idWorker = new SnowflakeIdWorker(00);
for (int i = 0; i < 1000; i++) {
long id = idWorker.nextId();
System.out.println(Long.toBinaryString(id));
System.out.println(id);
}
}
}

优点

  • 性能较优,速度快;

  • 无需第三方依赖,实现也很简单;

  • 可以根据实际情况调整和拓展算法,方便灵活;

缺点

  • 依赖机器时间,如果发生回拨会导致可能生成ID重复。业界使用一般是基于雪花算法进行拓展;

适用场景

当我们需要无法被猜测的ID,并且需要一定高性能,且需要long型,那么就可以使用我们雪花算法。比如常见的订单ID,用雪花算法别人无法猜测你每天的订单量是多少。

问题分析

1、为什么bit位置利用了63位?
因为long在java中占8字节,每字节8bit,一共64bit,其中有1个bit位是符号位不能用做生成ID,如果符号位也用来做ID中的1个bit为会导致ID出现负数,影响趋势递增特性。

2、雪花算法需要用单例方式生成ID
  因为雪花算法会依赖上一次生成的ID的时间来判断是否需要对序列号进行增加的操作,如果不是单例,两个业务用两个对象同时获取ID,则可能会生成相同的ID

3、机器位怎么取值?

  • 主机唯一标识
    如果线上机器有唯一标识,可以采用这种方式。我们公司运维平台提供了每台机器的唯一标识,不过提供的数值比较大,所以更改了机器位长度为22位。当当的sharding-jdbc也有类似的方法。例如,机器的 HostName 为: dangdang-db-sharding-dev-01(公司名-部门名-服务名-环境名-编号),会截取 HostName 最后的编号 01 作为机器位。

  • 可根据ip进行计算
     如果能保证不同机房的机器ip不重复,可以利用ip来计算机器位,IP最大 255.255.255.255。而(255+255+255+255) < 1024,因此采用IP段数值相加即可生成机器位,不受IP位限制。不过这种方式也不是绝对ok,要根据自身情况在选择,比如10.0.5.2 与 10.0.2.5 计算出来也是相同的。使用这种IP生成机器位的方法,必须保证IP段相加不能重复

时钟回拨问题

原因

1、人为操作,在真实环境一般不会有那个傻逼干这种事情,所以基本可以排除。
2、由于有些业务等需要,机器需要同步时间服务器(在这个过程中可能会存在时间回拨,查了下我们服务器一般在10ms以内(2小时同步一次)。

解决

雪花算法是强依赖我们的时间的,如果时间发生回拨,有可能会生成重复的ID,在我们上面的nextId中我们用当前时间和上一次的时间进行判断,如果当前时间小于上一次的时间那么肯定是发生了回拨,普通的算法会直接抛出异常,这里我们可以对其进行优化,一般分为两个情况:

  • 如果时间回拨时间较短,比如配置5ms以内,那么可以直接等待一定的时间,让机器的时间追上来。

  • 如果时间的回拨时间较长,我们不能接受这么长的阻塞等待,那么又有两个策略:

1、直接拒绝,抛出异常,打日志,通知RD时钟回滚。

2、利用扩展位,上面我们讨论过不同业务场景位数可能用不到那么多,那么我们可以把扩展位数利用起来了,比如当这个时间回拨比较长的时候,我们可以不需要等待,直接在扩展位加1。2位的扩展位允许我们有3次大的时钟回拨,一般来说就够了,如果其超过三次我们还是选择抛出异常,打日志。

另外一种解决方案:
由于是分布在各个机器自己上面,如果要几台集中的机器(并且不做时间同步),那么就基本上就不存在回拨可能性了(曲线救国也是救国,哈哈),但是也的确带来了新问题,各个结点需要访问集中机器,要保证性能,百度的uid-generator产生就是基于这种情况做的(每次取一批回来,很好的思想,性能也非常不错)https://github.com/baidu/uid-generator。

通过上面的几种策略可以比较的防护我们的时钟回拨,防止出现回拨之后大量的异常出现。下面是修改之后的代码,这里修改了时钟回拨的逻辑:

各方案对比

参考资料