注册中心实现(Etcd)
注册中心实现(Etcd)
什么是注册中心?
注册中心(Service Registry)是 RPC 框架中非常重要的一个组件,它的主要作用包括以下几点:
服务注册
- 服务提供者在启动时,会将自己的服务信息(如服务名、地址、端口等)注册到注册中心。
- 注册中心负责维护这些服务的注册信息,为服务消费者提供查询。
服务发现
- 服务消费者在需要调用服务时,会查询注册中心获取可用的服务提供者信息。
- 注册中心根据负载均衡策略选择合适的服务实例,返回给消费者。
服务状态管理
- 注册中心会定期检查服务实例的健康状态,并及时更新注册信息。
- 当某个服务实例下线或者故障时,注册中心会将其从服务列表中剔除。
服务治理
- 注册中心可以提供服务分组、服务版本管理等功能,支持更细粒度的服务治理。
- 通过注册中心,可以实现服务的权重调整、灰度发布等高级特性。
用一张图来表示在本RPC项目中注册中心的作用,服务提供者把服务的信息放到注册中心中,消费者去注册中心服务发现,获取要调用的服务,然后再向服务提供者发起调用:
注册中心有哪些?
- Zookeeper: 分布式协调服务,广泛应用于 Dubbo、Kafka 等 RPC 框架中。
- Consul: HashiCorp 公司开源的服务网格解决方案,集成了注册中心、服务发现等功能。
- Eureka: Netflix 开源的服务注册中心,曾广泛应用于 Spring Cloud 生态。
- etcd: CoreOS 公司开源的分布式键值存储,也可用作注册中心。
- Nacos: Alibaba 开源的应用管理与服务治理平台,集成了注册中心、配置管理等功能。
注册中心作为 RPC 框架的核心组件,它的可靠性和性能对整个分布式系统的可用性都有重要影响。
注册中心设计
我们的注册中心需要实现几个关键的能力:
服务注册和发现
- 服务提供者在启动时将自身信息注册到注册中心。
- 服务消费者通过注册中心查找并调用可用的服务提供者。
- 注册中心需要提供高效的查询和路由机制,确保服务发现的性能。
服务健康检查
- 注册中心需要定期检查服务实例的健康状态,及时剔除下线或故障的服务。
- 健康检查可以通过心跳机制、TCP 连接检测等方式实现。
- 注册中心需要处理服务实例的上下线事件,并通知相关的服务消费者。
负载均衡策略
- 当存在多个可用的服务提供者时,注册中心需要根据负载均衡策略选择合适的实例。
- 常见的负载均衡策略包括随机、轮询、加权轮询、最小活跃连接数等。
- 注册中心需要根据服务实例的负载情况动态调整负载均衡策略。
服务元数据管理
- 注册中心需要存储服务提供者的详细元数据信息,如服务名称、版本、实例地址等。
- 这些元数据信息可以帮助服务消费者做更精细的服务选择和调用。
- 注册中心需要提供元数据的增删改查等管理功能。
服务治理能力
- 注册中心可以提供服务分组、服务权重、路由规则等高级服务治理功能。
- 这些功能可以帮助开发者实现更细粒度的服务管控,如灰度发布、流量控制等。
- 注册中心需要设计可扩展的架构,支持服务治理功能的灵活配置和动态调整。
高可用和容错
- 注册中心自身需要具备高可用性,避免单点故障。
- 可以采用主备复制、分布式部署等方式来提高可用性。
- 注册中心还需要考虑服务发现的容错性,在注册中心宕机时仍能保证服务调用。
性能和扩展性
- 注册中心需要支撑大规模的服务注册和发现,具备良好的性能表现。
- 可以采用分片、缓存、异步等手段来提高注册中心的吞吐量和响应速度。
- 注册中心的架构还需要具备良好的扩展性,以应对不断增长的服务规模。
用一张图来表示注册中心要实现的功能
一些功能如 负载均衡我们已经在RPC层实现过了,注册中心可以暂时不实现,主要实现几个核心的:
- 服务注册、服务发现、服务剔除、心跳检测、数据存储
技术选型Etcd
对于注册中心的技术选型,我们需要考虑它的性能,高可用性,高可靠性,稳定性,数据一致性等内容。
高可靠和高可用非要重要,如果注册中心都挂了,那么会影响到所有服务调用。
在本章节中,我们实现Etcd作为注册中心。https://etcd.io/
Etcd介绍
etcd(读作 et-see-dee)是一种开源的分布式统一键值存储,用于分布式系统或计算机集群的共享配置、服务发现和的调度协调。etcd 有助于促进更加安全的自动更新,协调向主机调度的工作,并帮助设置容器的覆盖网络。
etcd 是许多其他项目的核心组件。最值得注意的是,它是 Kubernetes 的首要数据存储,也是容器编排的实际标准系统。使用 etcd, 云原生应用可以保持更为一致的运行时间,而且在个别服务器发生故障时也能正常工作。应用从 etcd 读取数据并写入到其中;通过分散配置数据,为节点配置提供冗余和弹性。
服务元信息设计
ServiceMetaInfo是注册中心中服务的元数据信息,包含服务名称、版本、地址、权重等关键属性,为服务注册、发现等功能提供了基础数据模型。
/**
* 服务元信息 (注册信息)
*/
@Data
public class ServiceMetaInfo {
/**
* 服务名称
*/
private String serviceName;
/**
* 服务版本号
*/
private String serviceVersion = "1.0";
/**
* 服务地址
*/
private String serviceAddress;
/**
* 服务分组(暂未实现)
*/
private String serviceGroup = "default";
private String serviceHost;
private Integer servicePort;
/**
* 服务权重
*/
private int weight = 1;
/**
* 获取服务键名
*/
public String getServiceKey() {
return String.format("%s:%s", serviceName, serviceVersion);
}
/**
* 获取服务节点键名
*/
public String getServiceNodeKey() {
return String.format("%s/%s:%s", getServiceKey(), serviceHost,servicePort);
}
/**
* 获取完整的服务地址
*/
public String getServiceAddress() {
if (!StrUtil.contains(serviceHost, "http")) {
return String.format("http://%s:%s", serviceHost, servicePort);
}
return String.format("%s:%s", serviceHost, servicePort);
}
}
注册中心配置
注册中心配置主要包括:注册中心类别、地址、用户名、密码、超时时间。
package com.yunfei.rpc.config;
import com.yunfei.rpc.registry.RegistryKeys;
import lombok.Data;
/**
* RPC注册中心配置
*/
@Data
public class RegistryConfig {
/**
* 注册中心类别
*/
private String registry = RegistryKeys.ETCD;
/**
* 注册中心地址
*/
private String address = "http://localhost:2380";
/**
* 用户名
*/
private String username;
/**
* 密码
*/
private String password;
/**
* 超时时间 ms
*/
private Long timeout = 10000L;
}
注册中心接口
我们可能会有多个注册中心,它和我们的序列化器一样,都应该可以支持使用Java的SPI机制进行动态加载,例如注册中心我们可以在配置文件中设置,可以选用Zookeeper、Redis、Nacos等等。
接口主要提供:初始化、服务注册、服务发现 、注销服务、心跳检测、节点监听功能。
/**
* 注册中心接口
*/
public interface Registry {
/**
* 初始化
*
* @param registryConfig
*/
void init(RegistryConfig registryConfig);
/**
* 注册服务 服务端
*
* @param serviceMetaInfo
* @throws Exception
*/
void register(ServiceMetaInfo serviceMetaInfo) throws Exception;
/**
* 取消注册服务 服务端
*
* @param serviceMetaInfo
*/
void unRegister(ServiceMetaInfo serviceMetaInfo);
/**
* 服务发现 获取某服务的所有节点 客户端 消费端
*
* @param serviceKey
* @return
*/
List<ServiceMetaInfo> serviceDiscovery(String serviceKey);
/**
* 服务销毁
*/
void destroy();
/**
* 心跳检测
*/
void heartbeat();
/**
* 监听服务节点
*/
void watch(String serviceNodeKey);
}
Etcd注册中心实现
全局变量
private Client client;
private KV kvClient;
/**
* 根节点
*/
private static final String ETCD_ROOT_PATH = "/rpc/";
/**
* 本地注册节点 key 集合 用于维护续期
*/
private final Set<String> localRegisterNodeKeySet = new HashSet<>();
/**
* 注册中心缓存
*/
private final RegistryServiceCache registryServiceCache = new RegistryServiceCache();
/**
* 监听的key集合
*/
private final Set<String> watchingKeySet = new ConcurrentHashSet<>();
初始化
初始化主要是为了创建Etcd的client和KVClient对象
@Override
public void init(RegistryConfig registryConfig) {
client = Client.builder().endpoints(registryConfig.getAddress())
.connectTimeout(Duration.ofMillis(registryConfig.getTimeout())).build();
kvClient = client.getKVClient();
heartbeat();
}
服务注册
服务注册主要就是把键值对保存到Etcd中:
- 键是:ETCD_ROOT_PATH + serviceMetaInfo.getServiceNodeKey() 相当于一个唯一标识
- 值是:服务的元信息serviceMetaInfo
同时我们需要设置过期时间30s,用于剔除过期节点、心跳检测。这样如果服务提供者宕机了,也可以超时自动移除
@Override
public void register(ServiceMetaInfo serviceMetaInfo) throws Exception {
// 创建Lease 和KV客户端
Lease leaseClient = client.getLeaseClient();
// 创建租约 30s
long leaseId = leaseClient.grant(30).get().getID();
// 设置要存储的键值对
String registerKey = ETCD_ROOT_PATH + serviceMetaInfo.getServiceNodeKey();
ByteSequence key = ByteSequence.from(registerKey, StandardCharsets.UTF_8);
ByteSequence value = ByteSequence.from(JSONUtil.toJsonStr(serviceMetaInfo), StandardCharsets.UTF_8);
// 将键值对与租约绑定 并设置过期时间
PutOption putOption = PutOption.builder().withLeaseId(leaseId).build();
kvClient.put(key, value, putOption).get();
localRegisterNodeKeySet.add(registerKey);
}
服务取消注册
即从Etcd中删除对应的服务节点信息,本地集合 也要删。
@Override
public void unRegister(ServiceMetaInfo serviceMetaInfo) {
String registerKey = ETCD_ROOT_PATH + serviceMetaInfo.getServiceNodeKey();
kvClient.delete(ByteSequence.from(registerKey, StandardCharsets.UTF_8));
localRegisterNodeKeySet.remove(registerKey);
}
服务发现
我们有一个本地缓存数组serviceCache:它的实现很简单:
public class RegistryServiceCache {
/**
* 服务缓存
*/
List<ServiceMetaInfo> serviceCache;
/**
* 写缓存
*/
void writeCache(List<ServiceMetaInfo> newServiceCache) {
serviceCache = newServiceCache;
}
/**
* 读缓存
*/
List<ServiceMetaInfo> readCache() {
return serviceCache;
}
/**
* 清空缓存
*/
void clearCache() {
serviceCache = null;
}
}
如果缓存中有,那么直接获取,就不用从Etcd中获取了
如果没有的话,那么就去搜索,根据前缀搜索,拿到结果之后再把服务进行解析为List<ServiceMetaInfo>
的样子
最后还可以 写入缓存中
@Override
public List<ServiceMetaInfo> serviceDiscovery(String serviceKey) {
// 优先从缓存中获取
List<ServiceMetaInfo> serviceMetaInfoList = registryServiceCache.readCache();
if (CollUtil.isNotEmpty(serviceMetaInfoList)) {
return serviceMetaInfoList;
}
// 前缀搜索,结尾一定要加 '/'
String searchPrefix = ETCD_ROOT_PATH + serviceKey + "/";
GetOption getOption = GetOption.builder().isPrefix(true).build();
try {
List<KeyValue> keyValues = kvClient.get(ByteSequence.from(searchPrefix, StandardCharsets.UTF_8), getOption).get().getKvs();
// 解析服务
List<ServiceMetaInfo> serviceMetaInfos = keyValues.stream().map(keyValue -> {
String key = keyValue.getValue().toString(StandardCharsets.UTF_8);
// 监听key的变化
watch(key);
return JSONUtil.toBean(key, ServiceMetaInfo.class);
}).collect(Collectors.toList());
// 写入缓存
registryServiceCache.writeCache(serviceMetaInfos);
return serviceMetaInfos;
} catch (Exception e) {
throw new RuntimeException("服务发现失败", e);
}
}
服务销毁
当服务提供者关闭的时候,我们需要关闭所有注册的服务,同时也要删除本地即集合。
@Override
public void destroy() {
System.out.println("当前节点下线");
// 下线时,删除所有注册的服务
for (String key : localRegisterNodeKeySet) {
try {
kvClient.delete(ByteSequence.from(key, StandardCharsets.UTF_8)).get();
} catch (Exception e) {
throw new RuntimeException(key + "下线失败", e);
}
}
// 关闭客户端
if (client != null) {
client.close();
}
if (kvClient != null) {
kvClient.close();
}
}
服务下线分为:
- 被动下线:Etcd过期机制删除
- 主动下线:提供者主动从注册中心删除
如何在Java项目退出的时候,执行操作呢?我们可以 利用JVM的ShutDownHook来实现
@Slf4j
public class RpcApplication {
private static volatile RpcConfig rpcConfig;
public static void init(RpcConfig newRpcConfig) {
rpcConfig = newRpcConfig;
log.info("rpc application init success,config:{}", rpcConfig);
// // 注册中心初始化
RegistryConfig registryConfig = rpcConfig.getRegistryConfig();
Registry registry = RegistryFactory.getInstance(registryConfig.getRegistry());
registry.init(registryConfig);
log.info("registry init success,config:{}", registryConfig);
// 创建并 注册Shutdown Hook ,JVM 退出时执行擦欧总
Runtime.getRuntime().addShutdownHook(new Thread(registry::destroy));
}
...
}
心跳检测
我们向Etcd定期发送心跳信号来检测目标是否存活,如果系统在一定时间内没有响应,那么就 认为目标系统故障或者不可用了。
Etcd实现心跳检测比较简单,因为Etcd自带了key过期的机制。我们给节点注册的时候加上了过期时间,再让节点定期去续期,重置过期时间,如果节点宕机了,一直不去续期,那么Etcd会自动删除过期key。
因此我们的心跳检测只需要去定期的续约即可。
我们怎么知道服务提供者要去维护哪些节点呢?
只需要在本地维护一个注册节点信息集合,注册的时候把Key加入到集合中,每次维护集合的信息。
@Override
public void heartbeat() {
// 100s续约一次
CronUtil.schedule("*/10 * * * * *", new Task() {
@Override
public void execute() {
for (String key : localRegisterNodeKeySet) {
try {
List<KeyValue> keyValues = kvClient.get(ByteSequence.from(key, StandardCharsets.UTF_8)).get().getKvs();
// 节点已经过期了,需要重启节点才能重新注册
if (CollUtil.isEmpty(keyValues)) {
continue;
}
// 节点没有过期,重新 注册 相当于续约
KeyValue keyValue = keyValues.get(0);
String value = keyValue.getValue().toString(StandardCharsets.UTF_8);
ServiceMetaInfo serviceMetaInfo = JSONUtil.toBean(value, ServiceMetaInfo.class);
register(serviceMetaInfo);
} catch (Exception e) {
throw new RuntimeException(key + "续约失败", e);
}
}
}
});
// 支持秒级别定时任务
CronUtil.setMatchSecond(true);
CronUtil.start();
}
服务监听
如果服务注册信息发生了变化,那么我们需要更新消费端缓存
@Override
public void watch(String serviceNodeKey) {
Watch watchClient = client.getWatchClient();
// 之间未被监听,添加监听
boolean newWatch = watchingKeySet.add(serviceNodeKey);
if (newWatch) {
watchClient.watch(ByteSequence.from(serviceNodeKey, StandardCharsets.UTF_8), (response) -> {
for (WatchEvent event : response.getEvents()) {
switch (event.getEventType()) {
case DELETE:
registryServiceCache.clearCache();
break;
case PUT:
default:
break;
}
}
});
}
}