Hike News
Hike News

使用apollo配置中心生成雪花算法的workerId

前言

在分布式系统中,有一些需要使用全局唯一ID的场景,这时候雪花算法(snowflake)就是一种不错的解决方式。

但是雪花算法需要用到一个workerId,同一个应用部署多个实例的时候workerId不能相同。

使用环境变量或者启动参数来指定workerId的方式虽然在一定程度上解决了workerId的问题,但是给容器环境下的自动化部署或者动态扩容带来了新的挑战。

Apollo配置中心介绍

Apollo(阿波罗)是携程框架部门研发的分布式配置中心,能够集中化管理应用不同环境、不同集群的配置,配置修改后能够实时推送到应用端,并且具备规范的权限、流程治理等特性,适用于微服务配置管理场景。

GitHub地址:https://github.com/ctripcorp/apollo
Apollo架构图

Apollo分为Client、Config Service、Admin Service、Portal四个部分:

  • Client是一个jar包集成在业务应用里,通过Meta Server从Config Service获取配置信息;
  • Config Service提供配置的读取、推送等功能,服务对象是Apollo客户端(Client);
  • Admin Service提供配置的修改、发布等功能,服务对象是Apollo管理界面(Portal);
  • Portal通过Meta Server连接Admin Service修改、发布配置信息。

ConfigDB存储配置信息,由Config Service和Admin Service使用;

PortalDB存储权限和审计信息,由Portal使用。

改造Config Service生成workerId

原理介绍

Client端通过/configs/{appId}/{clusterName}/{namespace:.+}接口拉取全量配置信息,对应方法是com.ctrip.framework.apollo.configservice.controller.ConfigController#queryConfig。方法的返回值ApolloConfig有5个属性:appIdclusternamespaceNameconfigurationsreleaseKey,只要把生成的workerId放到configurations这个Map类型的属性中,Client端就能直接通过Key值拿到workerId来使用。

具体实现

一、数据库改动

ConfigDB库的instance表存储了使用配置的应用实例。

在Instance表新加一个NodeId字段用来存储生成的workerId,并且将AppId和NodeId设置为唯一索引:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
CREATE TABLE `instance` (
`Id` int(11) unsigned NOT NULL AUTO_INCREMENT COMMENT '自增Id',
`AppId` varchar(32) NOT NULL DEFAULT 'default' COMMENT 'AppID',
`ClusterName` varchar(32) NOT NULL DEFAULT 'default' COMMENT 'ClusterName',
`DataCenter` varchar(64) NOT NULL DEFAULT 'default' COMMENT 'Data Center Name',
`Ip` varchar(32) NOT NULL DEFAULT '' COMMENT 'instance ip',
`NodeId` int(5) DEFAULT NULL COMMENT '节点id',
`DataChange_CreatedTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`DataChange_LastTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '最后修改时间',
PRIMARY KEY (`Id`),
UNIQUE KEY `IX_UNIQUE_KEY` (`AppId`,`ClusterName`,`Ip`,`DataCenter`),
UNIQUE KEY `uk_appId_nodeId` (`AppId`,`NodeId`) USING BTREE,
KEY `IX_IP` (`Ip`),
KEY `IX_DataChange_LastTime` (`DataChange_LastTime`)
) ENGINE=InnoDB AUTO_INCREMENT=6779 DEFAULT CHARSET=utf8mb4 COMMENT='使用配置的应用实例';

二、代码改动

原来Instance表的记录是在获取配置时调用com.ctrip.framework.apollo.configservice.util.InstanceConfigAuditUtil#audit异步添加的,这里需要改成获取配置时同步添加实例信息并且放到ApolloConfigconfigurations属性中。

1.com.ctrip.framework.apollo.configservice.controller.ConfigController

queryConfig方法返回前获取实例信息并返回

1
2
3
4
5
6
Instance instance = instanceService.findInstance(appId, clusterName, dataCenter, clientIp);
Integer nodeId = instance.getNodeId();
if (nodeId == null) {
nodeId = -1;
}
apolloConfig.addConfig("apollo.node.id", String.valueOf(nodeId));

2.com.ctrip.framework.apollo.biz.service.InstanceService

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
@Value("${apollo.node.minId}")//workerId最小值,0-1023之间
private Integer minId;

@Value("${apollo.node.maxId}")//workerId最大值,0-1023之间
private Integer maxId;

private static final Integer DEFAULT_NODE_ID = null;

public Instance findInstance(String appId, String clusterName, String dataCenter, String ip) {
Instance instance = instanceRepository.findByAppIdAndClusterNameAndDataCenterAndIp(appId, clusterName,
dataCenter, ip);
if (instance == null) {
instance = createInstance(appId, clusterName, dataCenter, ip);
}
if (instance.getNodeId() == null) {
updateInstance(instance);
}
return instance;
}

private Instance createInstance(String appId, String clusterName, String dataCenter, String ip) {
Instance instance = new Instance();
instance.setAppId(appId);
instance.setClusterName(clusterName);
instance.setDataCenter(dataCenter);
instance.setIp(ip);
instance.setNodeId(generateNodeId(appId));
try {
instance = createInstance(instance);
} catch (DataIntegrityViolationException e) {
//使用数据库的唯一约束来保证workerId的唯一性
logger.error("createInstance.error,instance={}", instance);
instance = findInstance(appId, clusterName, dataCenter, ip);
if (instance == null) {
instance = createInstance(appId, clusterName, dataCenter, ip);
}
}
return instance;
}

private void updateInstance(Instance instance) {
try {
instance.setNodeId(generateNodeId(instance.getAppId()));
instanceRepository.save(instance);
} catch (DataIntegrityViolationException e) {
logger.error("updateInstance.error,instance={}", instance);
updateInstance(instance);
}
}

private Integer generateNodeId(String appId) {
//生成规则是当前appId最大workerId + 1
//如果workerId已经到最大值了就拿最早的workerId(这里假设一个应用不会同时有1023(maxId)个实例在运行)
Integer maxNodeId = instanceRepository.maxNodeIdByAppId(appId);
if (maxNodeId == null || minId > maxNodeId) {
return minId;
}

if (maxNodeId < maxId) {
return maxNodeId + 1;
}

return getEarliestNodeId(appId);
}

private Integer getEarliestNodeId(String appId) {
InstanceConfig instanceConfig = instanceConfigRepository.findTopByConfigAppIdOrderByReleaseDeliveryTime(appId);
if (instanceConfig == null) {
return DEFAULT_NODE_ID;
}
Instance instance = instanceRepository.findOne(instanceConfig.getInstanceId());
if (instance == null) {
return DEFAULT_NODE_ID;
}
Integer nodeId = instance.getNodeId();
instance.setNodeId(null);
instanceRepository.save(instance);
return nodeId;
}

三、使用方式

和其他配置的使用方式一样,直接用@Value("${apollo.node.id}")

这种方式生成的workerId可以直接使用在只需要一个workerId参数的雪花算法,另一种需要datacenterId和workerId两个参数的雪花算法在使用时需要做一下转换:

1
2
datacenterId=apollo.node.id >> 5
workerId=apolo.node.id & 31

取高五位的值作为datacenterId,取低五位的值作为workerId。

  • 一个workerId参数的雪花算法,workerId最大1023,二进制为1111111111;
  • datacenterId和workerId两个参数的雪花算法,datacenterId最大31,二进制为11111,workerId最大31,二进制为11111,datacenterId作为二进制的高5位,workerId作为二进制的低5位,组合起来1111111111,十进制为1023。