文章
python实现雪花算法根据时间戳生成id
鉴于多进程会出现生成的唯一id相同的问题:因为uwsgi项目启动是多进程的形式,所以我采取用每个uwsgi woker id作为雪花生成器的机器id-worker_id; 分布式的情况,可以将你的集群中的机器编号,将每个机器的编号代入datacenter_id。这样正常的生产环境就可以保证生成的雪花id相同了。可以根据循环位个数来增加雪花id每毫秒生成的最大数量。理论上无限大。
import time
import uwsgi
import os
# 64位ID的划分
# 这是用来记录机器id的, 默认情况下这10bit会分成两部分前5bit代表数据中心,后5bit代表某个数据中心的机器id,默认情况下计算大概可以支持32*32 - 1= 1023台机器。
WORKER_ID_BITS = 5 # 某个数据中心的机器id
DATACENTER_ID_BITS = 5 # 数据中心
SEQUENCE_BITS = 12 # 循环位,来对应1毫秒内产生的不同的id, 大概可以满足1毫秒并发生成2^12-1=4095次id的要求
# 最大取值计算
MAX_WORKER_ID = -1 ^ (-1 << WORKER_ID_BITS) # 2**5-1 0b11111
MAX_DATACENTER_ID = -1 ^ (-1 << DATACENTER_ID_BITS)
# 移位偏移计算
WOKER_ID_SHIFT = SEQUENCE_BITS
DATACENTER_ID_SHIFT = SEQUENCE_BITS + WORKER_ID_BITS
TIMESTAMP_LEFT_SHIFT = SEQUENCE_BITS + WORKER_ID_BITS + DATACENTER_ID_BITS
# 序号循环掩码
SEQUENCE_MASK = -1 ^ (-1 << SEQUENCE_BITS)
# Twitter元年时间戳
TWEPOCH = 1288834974657
class InvalidSystemClock(Exception):
"""
时钟回拨异常
"""
pass
class IdWorker(object):
"""
用于生成IDs
"""
def __init__(self, datacenter_id=0, worker_id=0, sequence=0):
"""
初始化
:param datacenter_id: 数据中心(机器区域)ID
:param worker_id: 机器ID
:param sequence: 起始序号
"""
print("uwsgi.worker_id():", uwsgi.worker_id())
worker_id = uwsgi.worker_id()
print(f"init_IdWorker worker_id:{worker_id}")
# sanity check
if worker_id > MAX_WORKER_ID or worker_id < 0:
raise ValueError('worker_id值越界')
if datacenter_id > MAX_DATACENTER_ID or datacenter_id < 0:
raise ValueError('datacenter_id值越界')
self.worker_id = worker_id
self.datacenter_id = datacenter_id
self.sequence = sequence
self.last_timestamp = -1 # 上次计算的时间戳
def _gen_timestamp(self):
"""
生成整数时间戳
:return:int timestamp
"""
return int(time.time() * 1000)
def get_ids(self, count):
ids = []
for i in range(count):
ids.append(self.get_id())
return ids
def get_id(self):
"""
获取新ID
:return:
"""
timestamp = self._gen_timestamp()
# 时钟回拨
if timestamp < self.last_timestamp:
print('clock is moving backwards. Rejecting requests until {}'.
format(self.last_timestamp))
raise InvalidSystemClock
if timestamp == self.last_timestamp:
self.sequence = (self.sequence + 1) & SEQUENCE_MASK
if self.sequence == 0:
timestamp = self._til_next_millis(self.last_timestamp)
else:
self.sequence = 0
self.last_timestamp = timestamp
new_id = ((timestamp - TWEPOCH) << TIMESTAMP_LEFT_SHIFT) | (self.datacenter_id << DATACENTER_ID_SHIFT) | \
(self.worker_id << WOKER_ID_SHIFT) | self.sequence
return str(new_id)
def _til_next_millis(self, last_timestamp):
"""
等到下一毫秒
"""
timestamp = self._gen_timestamp()
while timestamp <= last_timestamp:
timestamp = self._gen_timestamp()
return timestamp
if __name__ == '__main__':
worker = IdWorker(datacenter_id=0, worker_id=1)
print(worker.get_id())
print(len(str(worker.get_id())))
deepseek版本
import time
from threading import Lock
class Snowflake:
def __init__(self, data_center_id, worker_id, epoch=1672531200000):
"""
初始化雪花算法生成器
:param data_center_id: 数据中心ID (0-31)
:param worker_id: 工作节点ID (0-31)
:param epoch: 起始时间戳(毫秒),默认2023-01-01 00:00:00 UTC+8
"""
# 位数分配(总64位,最高位符号位始终为0)
self.sequence_bits = 12 # 序列号位数
self.worker_bits = 5 # 机器ID位数
self.data_center_bits = 5 # 数据中心ID位数
# 位移量计算
self.worker_shift = self.sequence_bits
self.data_center_shift = self.sequence_bits + self.worker_bits
self.timestamp_shift = self.sequence_bits + self.worker_bits + self.data_center_bits
# 最大值计算(用于溢出检查)
self.max_data_center_id = -1 ^ (-1 << self.data_center_bits)
self.max_worker_id = -1 ^ (-1 << self.worker_bits)
self.max_sequence = -1 ^ (-1 << self.sequence_bits)
# 参数校验
if data_center_id < 0 or data_center_id > self.max_data_center_id:
raise ValueError(f"数据中心ID必须在0-{self.max_data_center_id}之间")
if worker_id < 0 or worker_id > self.max_worker_id:
raise ValueError(f"工作节点ID必须在0-{self.max_worker_id}之间")
self.epoch = epoch
self.data_center_id = data_center_id << self.data_center_shift
self.worker_id = worker_id << self.worker_shift
# 并发控制
self.sequence = 0
self.last_timestamp = -1
self.lock = Lock()
def _get_timestamp(self):
"""获取当前毫秒时间戳"""
return int(time.time() * 1000)
def _til_next_millis(self, last_timestamp):
"""等待到下一个毫秒"""
timestamp = self._get_timestamp()
while timestamp <= last_timestamp:
timestamp = self._get_timestamp()
return timestamp
def generate_id(self):
"""生成分布式ID"""
with self.lock:
timestamp = self._get_timestamp()
# 检查时钟回拨
if timestamp < self.last_timestamp:
raise RuntimeError("系统时钟回拨,请检查服务器时间")
# 同一毫秒内请求,递增序列号
if timestamp == self.last_timestamp:
self.sequence = (self.sequence + 1) & self.max_sequence
if self.sequence == 0:
timestamp = self._til_next_millis(self.last_timestamp)
else:
self.sequence = 0 # 不同毫秒重置序列号
self.last_timestamp = timestamp
# 组合各部分生成最终ID
timestamp_part = (timestamp - self.epoch) << self.timestamp_shift
data_center_part = self.data_center_id
worker_part = self.worker_id
sequence_part = self.sequence
return timestamp_part | data_center_part | worker_part | sequence_part
# 使用示例
if __name__ == "__main__":
# 初始化生成器(数据中心ID=1,工作节点ID=2)
sf = Snowflake(data_center_id=1, worker_id=2)
print("生成的ID:", sf.generate_id())
java版本
public class Snowflake {
/** 起始时间戳(毫秒)- 2023-01-01 00:00:00 UTC+8 */
private final long epoch = 1672531200000L;
/** 数据中心ID位数 */
private final long dataCenterBits = 5L;
/** 机器ID位数 */
private final long workerBits = 5L;
/** 序列号位数 */
private final long sequenceBits = 12L;
/** 数据中心ID最大值 (0-31) */
private final long maxDataCenterId = ~(-1L << dataCenterBits);
/** 机器ID最大值 (0-31) */
private final long maxWorkerId = ~(-1L << workerBits);
/** 序列号最大值 (0-4095) */
private final long maxSequence = ~(-1L << sequenceBits);
/** 数据中心ID左移位数 */
private final long dataCenterShift = sequenceBits + workerBits;
/** 机器ID左移位数 */
private final long workerShift = sequenceBits;
/** 时间戳左移位数 */
private final long timestampShift = sequenceBits + workerBits + dataCenterBits;
/** 数据中心ID */
private final long dataCenterId;
/** 机器ID */
private final long workerId;
/** 序列号 */
private long sequence = 0L;
/** 上次生成ID的时间戳 */
private long lastTimestamp = -1L;
/**
* 初始化雪花算法生成器
*
* @param dataCenterId 数据中心ID (0-31)
* @param workerId 机器ID (0-31)
*/
public Snowflake(long dataCenterId, long workerId) {
if (dataCenterId < 0 || dataCenterId > maxDataCenterId) {
throw new IllegalArgumentException("数据中心ID必须在0-" + maxDataCenterId + "之间");
}
if (workerId < 0 || workerId > maxWorkerId) {
throw new IllegalArgumentException("机器ID必须在0-" + maxWorkerId + "之间");
}
this.dataCenterId = dataCenterId << dataCenterShift;
this.workerId = workerId << workerShift;
}
/**
* 生成分布式ID(线程安全)
*
* @return 64位唯一ID
*/
public synchronized long generateId() {
long timestamp = System.currentTimeMillis();
// 检查时钟回拨 [[6]]
if (timestamp < lastTimestamp) {
throw new RuntimeException("系统时钟回拨,请检查服务器时间");
}
// 同一毫秒内请求,递增序列号 [[3]]
if (timestamp == lastTimestamp) {
sequence = (sequence + 1) & maxSequence;
if (sequence == 0) {
timestamp = tilNextMillis(lastTimestamp);
}
} else {
sequence = 0; // 不同毫秒重置序列号
}
lastTimestamp = timestamp;
// 组合各部分生成最终ID [[10]]
return ((timestamp - epoch) << timestampShift)
| dataCenterId
| workerId
| sequence;
}
/**
* 等待到下一个毫秒
*
* @param lastTimestamp 上次生成ID的时间戳
* @return 当前时间戳
*/
private long tilNextMillis(long lastTimestamp) {
long timestamp = System.currentTimeMillis();
while (timestamp <= lastTimestamp) {
timestamp = System.currentTimeMillis();
}
return timestamp;
}
// 测试用例
public static void main(String[] args) {
Snowflake sf = new Snowflake(1, 2);
System.out.println("生成的ID: " + sf.generateId());
}
}