一、需求分析
上节给 RPC 框架增加了重试机制,提升了服务消费端的可靠性和健壮性。
但如果重试超过了一定次数仍然失败,又该怎么处理呢?
或者说当调用出现失败时,我们一定要重试么?有没有其他的策略呢?
本节实现另一种提高服务消费端可靠性和健壮性的机制 —— 容错机制。
二、设计方案
容错机制
容错是指系统在出现异常情况时,可以通过一定的策略保证系统仍然稳定运行,从而提高系统的可靠性和健壮性。
在分布式系统中,容错机制尤为重要,因为分布式系统中的各个组件都可能存在网络故障、节点故障等各种异常情况。要顾全大局,尽可能消除偶发 / 单点故障对系统带来的整体影响。
打个比方,将分布式系统类比为一家公司,如果公司某个优秀员工请假了,需要 “触发容错”,让另一个普通员工顶上,这本质上是容错机制的一种 降级 策略。
容错机制一般都是在系统出现错误时才触发的,这点没什么好讲,需要重点学习的是容错策略和容错实现方式。
容错策略
容错策略有很多种,常用的容错策略主要是以下几个:
1)Fail-Over 故障转移:一次调用失败后,切换一个其他节点再次进行调用,也算是一种重试。
2)Fail-Back 失败自动恢复:系统的某个功能出现调用失败或错误时,通过其他的方法,恢复该功能的正常。可以理解为降级,比如重试、调用其他服务等。
3)Fail-Safe 静默处理:系统出现部分非重要功能的异常时,直接忽略掉,不做任何处理,就像错误没有发生过一样。
4)Fail-Fast 快速失败:系统出现调用错误时,立刻报错,交给外层调用方处理。
容错实现方式
容错其实是个比较广泛的概念,除了上面几种策略外,很多技术都可以起到容错的作用。
比如:
1)重试:重试本质上也是一种容错的降级策略,系统错误后再试一次。
2)限流:当系统压力过大、已经出现部分错误时,通过限制执行操作(接受请求)的频率或数量,对系统进行保护。
3)降级:系统出现错误后,改为执行其他更稳定可用的操作。也可以叫做 “兜底” 或 “有损服务”,这种方式的本质是:即使牺牲一定的服务质量,也要保证系统的部分功能可用,保证基本的功能需求得到满足。
4)熔断:系统出现故障或异常时,暂时中断对该服务的请求,而是执行其他操作,以避免连锁故障。
5)超时控制:如果请求或操作长时间没处理完成,就进行中断,防止阻塞和资源占用。
注意,在实际项目中,根据对系统可靠性的需求,我们通常会结合多种策略或方法实现容错机制。
容错方案设计
回归到我们的 RPC 框架,之前已经给系统增加重试机制了,算是实现了一部分的容错能力。
现在,我们可以正式引入容错机制,通过更多策略来进一步增加系统可靠性。
容错方案的设计可以是很灵活的,建议大家先自己思考。
这里提供 2 种方案:
1)先容错再重试。
当系统发生异常时,首先会触发容错机制,比如记录日志、进行告警等,然后可以选择是否进行重试。
这种方案其实是把重试当做容错机制的一种可选方案。
2)先重试再容错。在发生错误后,首先尝试重试操作,如果重试多次仍然失败,则触发容错机制,比如记录日志、进行告警等。
这 2 种方案其实可以结合使用。
系统错误时,先通过重试操作解决一些临时性的异常,比如网络波动、服务端临时不可用等;如果重试多次后仍然失败,说明可能存在更严重的问题,这时可以触发其他的容错策略,比如调用降级服务、熔断、限流、快速失败等,来减少异常的影响,保障系统的稳定性和可靠性。
举个具体的例子:
- 系统调用服务 A 出现网络错误,使用容错策略 - 重试。
- 重试 3 次失败后,使用其他容错策略 - 降级。
- 系统改为调用不依赖网络的服务 B,完成操作。
三、开发实现
1、多种容错策略实现
下面实现 2 种最基本的容错策略:Fail-Fast 快速失败、Fail-Safe 静默处理。
在 RPC 项目中新建 fault.tolerant
包,将所有容错相关的代码放到该包下。
1)先编写容错策略通用接口。提供一个容错方法,使用 Map 类型的参数接受上下文信息(可用于灵活地传递容错处理需要用到的数据),并且接受一个具体的异常类参数。
由于容错是应用到发送请求操作的,所以容错方法的返回值是 RpcResponse(响应)。
代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
|
public interface TolerantStrategy {
RpcResponse doTolerant(Map<String, Object> context, Exception e); }
|
2)快速失败容错策略实现。
很好理解,就是遇到异常后,将异常再次抛出,交给外层处理。
代码如下:
1 2 3 4 5 6 7 8 9 10 11
|
public class FailFastTolerantStrategy implements TolerantStrategy {
@Override public RpcResponse doTolerant(Map<String, Object> context, Exception e) { throw new RuntimeException("服务报错", e); } }
|
3)静默处理容错策略实现。
也很好理解,就是遇到异常后,记录一条日志,然后正常返回一个响应对象,就好像没有出现过报错。
代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13
|
@Slf4j public class FailSafeTolerantStrategy implements TolerantStrategy {
@Override public RpcResponse doTolerant(Map<String, Object> context, Exception e) { log.info("静默处理异常", e); return new RpcResponse(); } }
|
4)其他容错策略。
还可以自行实现更多的容错策略,比如 FailBackTolerantStrategy
故障恢复策略:
1 2 3 4 5 6 7 8 9 10 11 12 13
|
@Slf4j public class FailBackTolerantStrategy implements TolerantStrategy {
@Override public RpcResponse doTolerant(Map<String, Object> context, Exception e) { return null; } }
|
还有 FailOverTolerantStrategy
故障转移策略:
1 2 3 4 5 6 7 8 9 10 11 12 13
|
@Slf4j public class FailOverTolerantStrategy implements TolerantStrategy {
@Override public RpcResponse doTolerant(Map<String, Object> context, Exception e) { return null; } }
|
2、支持配置和扩展容错策略
一个成熟的 RPC 框架可能会支持多种不同的容错策略,像序列化器、注册中心、负载均衡器一样,我们的需求是,让开发者能够填写配置来指定使用的容错策略,并且支持自定义容错策略,让框架更易用、更利于扩展。
要实现这点,开发方式和序列化器、注册中心、负载均衡器都是一样的,都可以使用工厂创建对象、使用 SPI 动态加载自定义的注册中心。
1)容错策略常量。
在 fault.tolerant
包下新建 TolerantStrategyKeys
类,列举所有支持的容错策略键名。
代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
|
public interface TolerantStrategyKeys {
String FAIL_BACK = "failBack";
String FAIL_FAST = "failFast";
String FAIL_OVER = "failOver";
String FAIL_SAFE = "failSafe";
}
|
2)使用工厂模式,支持根据 key 从 SPI 获取容错策略对象实例。
在 fault.tolerant
包下新建 TolerantStrategyFactory
类,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
|
public class TolerantStrategyFactory {
static { SpiLoader.load(TolerantStrategy.class); }
private static final TolerantStrategy DEFAULT_RETRY_STRATEGY = new FailFastTolerantStrategy();
public static TolerantStrategy getInstance(String key) { return SpiLoader.getInstance(TolerantStrategy.class, key); }
}
|
这个类可以直接复制之前的 SerializerFactory,然后略做修改。可以发现,只要跑通了一次 SPI 机制,后续的开发就很简单。
3)在 META-INF
的 rpc/system
目录下编写容错策略接口的 SPI 配置文件,文件名称为 com.yupi.yurpc.fault.tolerant.TolerantStrategy
。
代码如下:
1 2 3 4
| failBack=com.yupi.yurpc.fault.tolerant.FailBackTolerantStrategy failFast=com.yupi.yurpc.fault.tolerant.FailFastTolerantStrategy failOver=com.yupi.yurpc.fault.tolerant.FailOverTolerantStrategy failSafe=com.yupi.yurpc.fault.tolerant.FailSafeTolerantStrategy
|
4)为 RpcConfig 全局配置新增容错策略的配置,代码如下:
1 2 3 4 5 6 7 8
| @Data public class RpcConfig {
private String tolerantStrategy = TolerantStrategyKeys.FAIL_FAST; }
|
3、应用容错功能
容错功能的应用非常简单,我们只需要修改 ServiceProxy 的部分代码,在重试多次抛出异常时,从工厂中获取容错策略并执行即可。
修改的代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
|
RpcResponse rpcResponse; try { RetryStrategy retryStrategy = RetryStrategyFactory.getInstance(rpcConfig.getRetryStrategy()); rpcResponse = retryStrategy.doRetry(() -> VertxTcpClient.doRequest(rpcRequest, selectedServiceMetaInfo) ); } catch (Exception e) { TolerantStrategy tolerantStrategy = TolerantStrategyFactory.getInstance(rpcConfig.getTolerantStrategy()); rpcResponse = tolerantStrategy.doTolerant(null, e); } return rpcResponse.getData();
|
修改后的 ServiceProxy 的完整代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
|
public class ServiceProxy implements InvocationHandler {
@Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { String serviceName = method.getDeclaringClass().getName(); RpcRequest rpcRequest = RpcRequest.builder() .serviceName(serviceName) .methodName(method.getName()) .parameterTypes(method.getParameterTypes()) .args(args) .build();
RpcConfig rpcConfig = RpcApplication.getRpcConfig(); Registry registry = RegistryFactory.getInstance(rpcConfig.getRegistryConfig().getRegistry()); ServiceMetaInfo serviceMetaInfo = new ServiceMetaInfo(); serviceMetaInfo.setServiceName(serviceName); serviceMetaInfo.setServiceVersion(RpcConstant.DEFAULT_SERVICE_VERSION); List<ServiceMetaInfo> serviceMetaInfoList = registry.serviceDiscovery(serviceMetaInfo.getServiceKey()); if (CollUtil.isEmpty(serviceMetaInfoList)) { throw new RuntimeException("暂无服务地址"); }
LoadBalancer loadBalancer = LoadBalancerFactory.getInstance(rpcConfig.getLoadBalancer()); Map<String, Object> requestParams = new HashMap<>(); requestParams.put("methodName", rpcRequest.getMethodName()); ServiceMetaInfo selectedServiceMetaInfo = loadBalancer.select(requestParams, serviceMetaInfoList); RpcResponse rpcResponse; try { RetryStrategy retryStrategy = RetryStrategyFactory.getInstance(rpcConfig.getRetryStrategy()); rpcResponse = retryStrategy.doRetry(() -> VertxTcpClient.doRequest(rpcRequest, selectedServiceMetaInfo) ); } catch (Exception e) { TolerantStrategy tolerantStrategy = TolerantStrategyFactory.getInstance(rpcConfig.getTolerantStrategy()); rpcResponse = tolerantStrategy.doTolerant(null, e); } return rpcResponse.getData(); } }
|