一、需求分析
上节基于 Etcd 完成了基础的注册中心,能够注册和获取服务和节点信息。
但目前系统仅仅是处于可用的程度,还有很多需要解决的问题和可优化点:
- 数据一致性:服务提供者如果下线了,注册中心需要即时更新,剔除下线节点。否则消费者可能会调用到已经下线的节点。
- 性能优化:服务消费者每次都需要从注册中心获取服务,可以使用缓存进行优化。
- 高可用性:保证注册中心本身不会宕机。
- 可扩展性:实现更多其他种类的注册中心。
本节将实践 4 个注册中心的优化点:
- 心跳检测和续期机制
- 服务节点下线机制
- 消费端服务缓存
- 基于 ZooKeeper 的注册中心实现
二、注册中心优化
心跳检测和续期机制
心跳检测介绍
心跳检测(俗称 heartBeat)是一种用于监测系统是否正常工作的机制。它通过定期发送 心跳信号(请求)来检测目标系统的状态。
如果接收方在一定时间内没有收到心跳信号或者未能正常响应请求,就会认为目标系统故障或不可用,从而触发相应的处理或告警机制。
心跳检测的应用场景非常广泛,尤其是在分布式、微服务系统中,比如集群管理、服务健康检查等。
我们怎么检测自己做的 web 后端是否正常运行呢?
一个最简单的方法,就是写一个心跳检测接口,比如:
1 2 3 4 5 6 7 8 9 10 11 12
| @RestController class HealthCheckController {
@GetMapping("/actuator/health") public String healthCheck() {
return "OK"; } }
|
然后只需要执行一个脚本,定期调用这个接口,如果调用失败,就知道系统故障了。
方案设计
1)从心跳检测的概念来看,实现心跳检测一般需要 2 个关键:定时、网络请求。
但是使用 Etcd 实现心跳检测会更简单一些,因为 Etcd 自带了 key 过期机制,我们不妨换个思路:给节点注册信息一个 “生命倒计时”,让节点定期 续期,重置 自己的 倒计时。如果节点已宕机,一直不续期,Etcd 就会对 key 进行过期删除。
一句话总结:到时间还不续期就是寄了。
在 Etcd 中,我们要实现心跳检测和续期机制,可以遵循如下步骤:
- 服务提供者向 Etcd 注册自己的服务信息,并在注册时设置 TTL(生存时间)。
- Etcd 在接收到服务提供者的注册信息后,会自动维护服务信息的 TTL,并在 TTL 过期时删除该服务信息。
- 服务提供者定期请求 Etcd 续签自己的注册信息,重写 TTL。
需要注意的是,续期时间一定要小于过期时间,允许一次容错的机会。
2)每个服务提供者都需要找到自己注册的节点、续期自己的节点,但问题是,怎么找到当前服务提供者项目自己的节点呢?
那就充分利用本地的特性,在服务提供者本地维护一个 已注册节点集合,注册时添加节点 key 到集合中,只需要续期集合内的 key 即可。
开发实现
1)给注册中心 Registry
接口补充心跳检测方法,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12
|
public interface Registry {
...
void heartBeat(); }
|
2)维护续期节点集合。
定义一个本机注册的节点 key 集合,用于维护续期:
1 2 3 4
|
private final Set<String> localRegisterNodeKeySet = new HashSet<>();
|
在服务注册时,需要将节点添加到集合中,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| public void register(ServiceMetaInfo serviceMetaInfo) throws Exception { Lease leaseClient = client.getLeaseClient();
long leaseId = leaseClient.grant(30).get().getID();
String registerKey = ETCD_ROOT_PATH + serviceMetaInfo.getServiceNodeKey(); ByteSequence key = ByteSequence.from(registerKey, StandardCharsets.UTF_8); ByteSequence value = ByteSequence.from(JSONUtil.toJsonStr(serviceMetaInfo), StandardCharsets.UTF_8);
PutOption putOption = PutOption.builder().withLeaseId(leaseId).build(); kvClient.put(key, value, putOption).get(); localRegisterNodeKeySet.add(registerKey); }
|
同理,在服务注销时,也要从集合中移除对应节点:
1 2 3 4 5 6
| public void unRegister(ServiceMetaInfo serviceMetaInfo) { String registerKey = ETCD_ROOT_PATH + serviceMetaInfo.getServiceNodeKey(); kvClient.delete(ByteSequence.from(registerKey, StandardCharsets.UTF_8)); localRegisterNodeKeySet.remove(registerKey); }
|
3)在 EtcdRegistry
中实现 heartBeat 方法。
可以使用 Hutool 工具类的 CronUtil 实现定时任务,对所有集合中的节点执行 重新注册 操作,这是一个小 trick,就相当于续签了。
心跳检测方法的代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| @Override public void heartBeat() { CronUtil.schedule("*/10 * * * * *", new Task() { @Override public void execute() { for (String key : localRegisterNodeKeySet) { try { List<KeyValue> keyValues = kvClient.get(ByteSequence.from(key, StandardCharsets.UTF_8)) .get() .getKvs(); if (CollUtil.isEmpty(keyValues)) { continue; } KeyValue keyValue = keyValues.get(0); String value = keyValue.getValue().toString(StandardCharsets.UTF_8); ServiceMetaInfo serviceMetaInfo = JSONUtil.toBean(value, ServiceMetaInfo.class); register(serviceMetaInfo); } catch (Exception e) { throw new RuntimeException(key + "续签失败", e); } } } });
CronUtil.setMatchSecond(true); CronUtil.start(); }
|
采用这种实现方案的好处是,即时 Etcd 注册中心的数据出现了丢失,通过心跳检测机制也会重新注册节点信息。
4)开启 heartBeat。
在注册中心初始化的 init 方法中,调用 heartBeat 方法即可。
代码如下:
1 2 3 4 5 6 7 8 9
| @Override public void init(RegistryConfig registryConfig) { client = Client.builder() .endpoints(registryConfig.getAddress()) .connectTimeout(Duration.ofMillis(registryConfig.getTimeout())) .build(); kvClient = client.getKVClient(); heartBeat(); }
|
服务节点下线机制
当服务提供者节点宕机时,应该从注册中心移除掉已注册的节点,否则会影响消费端调用。所以我们需要设计一套服务节点下线机制。
方案设计
服务节点下线又分为:
- 主动下线:服务提供者项目正常退出时,主动从注册中心移除注册信息。
- 被动下线:服务提供者项目异常推出时,利用 Etcd 的 key 过期机制自动移除。
被动下线已经可以利用 Etcd 的机制实现了,我们主要开发主动下线。
问题是,怎么在 Java 项目正常退出时,执行某个操作呢?
其实非常简单,利用 JVM 的 ShutdownHook 就能实现。
JVM 的 ShutdownHook 是 Java 虚拟机提供的一种机制,允许开发者在 JVM 即将关闭之前执行一些清理工作或其他必要的操作,例如关闭数据库连接、释放资源、保存临时数据等。
Spring Boot 也提供了类似的停机能力。
开发实现
1)完善 Etcd 注册中心的 destroy
方法,补充下线节点的逻辑。
代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| public void destroy() { System.out.println("当前节点下线"); for (String key : localRegisterNodeKeySet) { try { kvClient.delete(ByteSequence.from(key, StandardCharsets.UTF_8)).get(); } catch (Exception e) { throw new RuntimeException(key + "节点下线失败"); } }
if (kvClient != null) { kvClient.close(); } if (client != null) { client.close(); } }
|
2)在 RpcApplication
的 init 方法中,注册 Shutdown Hook,当程序正常退出时会执行注册中心的 destroy 方法。
代码如下:
1 2 3 4 5 6 7 8 9 10 11 12
| public static void init(RpcConfig newRpcConfig) { rpcConfig = newRpcConfig; log.info("rpc init, config = {}", newRpcConfig.toString()); RegistryConfig registryConfig = rpcConfig.getRegistryConfig(); Registry registry = RegistryFactory.getInstance(registryConfig.getRegistry()); registry.init(registryConfig); log.info("registry init, config = {}", registryConfig); Runtime.getRuntime().addShutdownHook(new Thread(registry::destroy)); }
|
消费端服务缓存
正常情况下,服务节点信息列表的更新频率是不高的,所以在服务消费者从注册中心获取到服务节点信息列表后,完全可以 缓存在本地,下次就不用再请求注册中心获取了,能够提高性能。
1、增加本地缓存
本地缓存的实现很简单,用一个列表来存储服务信息即可,提供操作列表的基本方法,包括:写缓存、读缓存、清空缓存。
暂时先只考虑单服务(相同 serviceKey)的缓存。如果要实现多服务缓存,可以改为使用 Map 接口。
在 registry
包下新增缓存类 RegistryServiceCache
,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
|
public class RegistryServiceCache {
List<ServiceMetaInfo> serviceCache;
void writeCache(List<ServiceMetaInfo> newServiceCache) { this.serviceCache = newServiceCache; }
List<ServiceMetaInfo> readCache() { return this.serviceCache; }
void clearCache() { this.serviceCache = null; } }
|
2、使用本地缓存
1)修改 EtcdRegisty
的代码,使用本地缓存对象:
1 2 3 4
|
private final RegistryServiceCache registryServiceCache = new RegistryServiceCache();
|
2)修改服务发现逻辑,优先从缓存获取服务;如果没有缓存,再从注册中心获取,并且设置到缓存中。
代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| @Override public List<ServiceMetaInfo> serviceDiscovery(String serviceKey) { List<ServiceMetaInfo> cachedServiceMetaInfoList = registryServiceCache.readCache(); if (cachedServiceMetaInfoList != null) { return cachedServiceMetaInfoList; }
String searchPrefix = ETCD_ROOT_PATH + serviceKey + "/";
try { GetOption getOption = GetOption.builder().isPrefix(true).build(); List<KeyValue> keyValues = kvClient.get( ByteSequence.from(searchPrefix, StandardCharsets.UTF_8), getOption) .get() .getKvs(); List<ServiceMetaInfo> serviceMetaInfoList = keyValues.stream() .map(keyValue -> { String key = keyValue.getKey().toString(StandardCharsets.UTF_8); watch(key); String value = keyValue.getValue().toString(StandardCharsets.UTF_8); return JSONUtil.toBean(value, ServiceMetaInfo.class); }) .collect(Collectors.toList()); registryServiceCache.writeCache(serviceMetaInfoList); return serviceMetaInfoList; } catch (Exception e) { throw new RuntimeException("获取服务列表失败", e); } }
|
3、服务缓存更新 - 监听机制
当服务注册信息发生变更(比如节点下线)时,需要即时更新消费端缓存。
问题是,怎么知道服务注册信息什么时候发生变更呢?
这就需要我们使用 Etcd 的 watch 监听机制,当监听的某个 key 发生修改或删除时,就会触发事件来通知监听者。
什么时候去创建 watch 监听器呢?
首先要明确 watch 监听是服务消费者还是服务提供者执行的。由于我们的目标是更新缓存,缓存是在服务消费端维护和使用的,所以也应该是服务消费端去 watch。
也就是说,只有服务消费者执行的方法中,可以创建 watch 监听器,那么比较合适的位置就是服务发现方法(serviceDiscovery)。可以对本次获取到的所有服务节点 key 进行监听。
还需要防止重复监听同一个 key,可以通过定义一个已监听 key 的集合来实现。
下面开发编码。
1)Registry 注册中心接口补充监听 key 的方法,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
|
public interface Registry {
void watch(String serviceNodeKey); }
|
2)EtcdRegistry
类中,新增监听 key 的集合。
可以使用 ConcurrentHashSet
防止并发冲突,代码如下:
1 2 3 4
|
private final Set<String> watchingKeySet = new ConcurrentHashSet<>();
|
3)在 EtcdRegistry
类中实现监听 key 的方法。
通过调用 Etcd 的 WatchClient
实现监听,如果出现了 DELETE
key 删除事件,则清理服务注册缓存。
注意,即使 key 在注册中心被删除后再重新设置,之前的监听依旧生效。所以我们只监听首次加入到监听集合的 key,防止重复。
代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
|
@Override public void watch(String serviceNodeKey) { Watch watchClient = client.getWatchClient(); boolean newWatch = watchingKeySet.add(serviceNodeKey); if (newWatch) { watchClient.watch(ByteSequence.from(serviceNodeKey, StandardCharsets.UTF_8), response -> { for (WatchEvent event : response.getEvents()) { switch (event.getEventType()) { case DELETE: registryServiceCache.clearCache(); break; case PUT: default: break; } } }); } }
|
4)在消费端获取服务时调用 watch 方法,对获取到的服务节点 key 进行监听。
修改服务发现方法的代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| public List<ServiceMetaInfo> serviceDiscovery(String serviceKey) { List<ServiceMetaInfo> cachedServiceMetaInfoList = registryServiceCache.readCache(); if (cachedServiceMetaInfoList != null) { return cachedServiceMetaInfoList; }
String searchPrefix = ETCD_ROOT_PATH + serviceKey + "/";
try { GetOption getOption = GetOption.builder().isPrefix(true).build(); List<KeyValue> keyValues = kvClient.get( ByteSequence.from(searchPrefix, StandardCharsets.UTF_8), getOption) .get() .getKvs(); List<ServiceMetaInfo> serviceMetaInfoList = keyValues.stream() .map(keyValue -> { String key = keyValue.getKey().toString(StandardCharsets.UTF_8); watch(key); String value = keyValue.getValue().toString(StandardCharsets.UTF_8); return JSONUtil.toBean(value, ServiceMetaInfo.class); }) .collect(Collectors.toList()); registryServiceCache.writeCache(serviceMetaInfoList); return serviceMetaInfoList; } catch (Exception e) { throw new RuntimeException("获取服务列表失败", e); } }
|
ZooKeeper 注册中心实现
这部分不作为学习重点,理解了一种注册中心的实现方式,再用其他技术实现注册中心就很简单了。
和 Etcd 注册中心的实现方式极其相似,步骤如下:
- 安装 ZooKeeper
- 引入客户端依赖
- 实现接口
- SPI 补充 ZooKeeper 注册中心
1)本地下载并启动 ZooKeeper。
下载链接:https://dlcdn.apache.org/zookeeper/zookeeper-3.8.4/apache-zookeeper-3.8.4-bin.tar.gz
正常启动 ZooKeeper 后,默认会占用几个端口号,比如 2181(客户端)、8080(管理端)等。
2)引入客户端依赖。
一般会使用 Apache Curator 来操作 ZooKeeper,可以参考官方文档:https://curator.apache.org/docs/getting-started 。
引入的依赖代码如下:
1 2 3 4 5 6 7 8 9 10
| <dependency> <groupId>org.apache.curator</groupId>
<artifactId>curator-x-discovery</artifactId>
<version>5.6.0</version>
</dependency>
|
3)ZooKeeper 注册中心实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176
|
@Slf4j public class ZooKeeperRegistry implements Registry {
private CuratorFramework client;
private ServiceDiscovery<ServiceMetaInfo> serviceDiscovery;
private final Set<String> localRegisterNodeKeySet = new HashSet<>();
private final RegistryServiceCache registryServiceCache = new RegistryServiceCache();
private final Set<String> watchingKeySet = new ConcurrentHashSet<>();
private static final String ZK_ROOT_PATH = "/rpc/zk";
@Override public void init(RegistryConfig registryConfig) { client = CuratorFrameworkFactory .builder() .connectString(registryConfig.getAddress()) .retryPolicy(new ExponentialBackoffRetry(Math.toIntExact(registryConfig.getTimeout()), 3)) .build();
serviceDiscovery = ServiceDiscoveryBuilder.builder(ServiceMetaInfo.class) .client(client) .basePath(ZK_ROOT_PATH) .serializer(new JsonInstanceSerializer<>(ServiceMetaInfo.class)) .build();
try { client.start(); serviceDiscovery.start(); } catch (Exception e) { throw new RuntimeException(e); } }
@Override public void register(ServiceMetaInfo serviceMetaInfo) throws Exception { serviceDiscovery.registerService(buildServiceInstance(serviceMetaInfo));
String registerKey = ZK_ROOT_PATH + "/" + serviceMetaInfo.getServiceNodeKey(); localRegisterNodeKeySet.add(registerKey); }
@Override public void unRegister(ServiceMetaInfo serviceMetaInfo) { try { serviceDiscovery.unregisterService(buildServiceInstance(serviceMetaInfo)); } catch (Exception e) { throw new RuntimeException(e); } String registerKey = ZK_ROOT_PATH + "/" + serviceMetaInfo.getServiceNodeKey(); localRegisterNodeKeySet.remove(registerKey); }
@Override public List<ServiceMetaInfo> serviceDiscovery(String serviceKey) { List<ServiceMetaInfo> cachedServiceMetaInfoList = registryServiceCache.readCache(); if (cachedServiceMetaInfoList != null) { return cachedServiceMetaInfoList; }
try { Collection<ServiceInstance<ServiceMetaInfo>> serviceInstanceList = serviceDiscovery.queryForInstances(serviceKey);
List<ServiceMetaInfo> serviceMetaInfoList = serviceInstanceList.stream() .map(ServiceInstance::getPayload) .collect(Collectors.toList());
registryServiceCache.writeCache(serviceMetaInfoList); return serviceMetaInfoList; } catch (Exception e) { throw new RuntimeException("获取服务列表失败", e); } }
@Override public void heartBeat() { }
@Override public void watch(String serviceNodeKey) { String watchKey = ZK_ROOT_PATH + "/" + serviceNodeKey; boolean newWatch = watchingKeySet.add(watchKey); if (newWatch) { CuratorCache curatorCache = CuratorCache.build(client, watchKey); curatorCache.start(); curatorCache.listenable().addListener( CuratorCacheListener .builder() .forDeletes(childData -> registryServiceCache.clearCache()) .forChanges(((oldNode, node) -> registryServiceCache.clearCache())) .build() ); } }
@Override public void destroy() { log.info("当前节点下线"); for (String key : localRegisterNodeKeySet) { try { client.delete().guaranteed().forPath(key); } catch (Exception e) { throw new RuntimeException(key + "节点下线失败"); } }
if (client != null) { client.close(); } }
private ServiceInstance<ServiceMetaInfo> buildServiceInstance(ServiceMetaInfo serviceMetaInfo) { String serviceAddress = serviceMetaInfo.getServiceHost() + ":" + serviceMetaInfo.getServicePort(); try { return ServiceInstance .<ServiceMetaInfo>builder() .id(serviceAddress) .name(serviceMetaInfo.getServiceKey()) .address(serviceAddress) .payload(serviceMetaInfo) .build(); } catch (Exception e) { throw new RuntimeException(e); } } }
|
4)SPI 增加对 ZooKeeper 的支持:
1 2
| etcd=com.yupi.yurpc.registry.EtcdRegistry zookeeper=com.yupi.yurpc.registry.ZooKeeperRegistry
|
5)最后,可以更改服务提供者和消费者的注册中心配置来测试。
更改的配置如下:
1 2
| rpc.registryConfig.registry=zookeeper rpc.registryConfig.address=localhost:2181
|