Spring Boot Reactor 整合 Resilience4j详析
作者:六七十三 发布时间:2021-08-08 10:30:02
1 引入 pom 包
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-all</artifactId>
</dependency>
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-spring-boot2</artifactId>
</dependency>
2 配置说明
2.1 限流 ratelimiter
两个限流配置:backendA 1s 中最多允许 10 次请求;
backendB 每 500ms 最多允许 6 次请求。
resilience4j.ratelimiter:
instances:
backendA:
limitForPeriod: 10
limitRefreshPeriod: 1s
timeoutDuration: 10ms
registerHealthIndicator: true
eventConsumerBufferSize: 100
backendB:
limitForPeriod: 6
limitRefreshPeriod: 500ms
timeoutDuration: 3s
配置属性 | 默认值 | 描述 |
---|---|---|
timeoutDuration | 5【s】 | 一个线程等待许可的默认等待时间 |
limitRefreshPeriod | 500【ns】 | 限制刷新的周期。在每个周期之后,速率限制器将其权限计数设置回 limitForPeriod 值 |
limitForPeriod | 50 | 一个 limitRefreshPeriod (周期)允许访问的数量(许可数量) |
2.2 重试 retry
注意指定需要重试的异常,不是所有的异常重试都有效。比如 DB 相关校验异常,如唯一约束等,重试也不会成功的。
重试配置:
resilience4j.retry:
instances:
backendA:
maxAttempts: 3
waitDuration: 10s
enableExponentialBackoff: true
exponentialBackoffMultiplier: 2
retryExceptions:
- org.springframework.web.client.HttpServerErrorException
- java.io.IOException
backendB:
maxAttempts: 3
waitDuration: 10s
retryExceptions:
- org.springframework.web.client.HttpServerErrorException
- java.io.IOException
配置属性 | 默认值 | 描述 |
---|---|---|
maxAttempts | 3 | 最大重试次数(包括第一次) |
waitDuration | 500【ms】 | 两次重试之间的等待间隔 |
intervalFunction | numOfAttempts -> waitDuration | 修改失败后等待间隔的函数。默认情况下,等待时间是个常量。 |
retryOnResultPredicate | result->false | 配置一个判断结果是否应该重试的 predicate 函数。如果结果应该重试,Predicate 必须返回 true,否则它必须返回 false。 |
retryExceptionPredicate | throwable -> true | 和 retryOnResultPredicate 类似,如果要重试,Predicate 必须返回true,否则返回 false。 |
retryExceptions | 空 | 需要重试的异常类型列表 |
ignoreExceptions | 空 | 不需要重试的异常类型列表 |
failAfterMaxAttempts | false | 当重试达到配置的 maxAttempts 并且结果仍未通过 retryOnResultPredicate 时启用或禁用抛出 MaxRetriesExceededException 的布尔值 |
intervalBiFunction | (numOfAttempts, Either<throwable, result>) -> waitDuration | 根据 maxAttempts 和结果或异常修改失败后等待间隔时间的函数。与 intervalFunction 一起使用时会抛出 IllegalStateException。 |
2.3 超时 TimeLimiter
超时配置:
resilience4j.timelimiter:
instances:
backendA:
timeoutDuration: 2s
cancelRunningFuture: true
backendB:
timeoutDuration: 1s
cancelRunningFuture: false
超时配置比较简单,主要是配置 timeoutDuration 也就是超时的时间。
cancelRunningFuture 的意思是:是否应该在运行的 Future 调用 cancel 去掉调用。
2.4 断路器 circuitbreaker
断路器有几种状态:关闭、打开、半开。注意:打开,意味着不能访问,会迅速失败。
CircuitBreaker 使用滑动窗口来存储和汇总调用结果。您可以在基于计数的滑动窗口和基于时间的滑动窗口之间进行选择。基于计数的滑动窗口聚合最后 N 次调用的结果。基于时间的滑动窗口聚合了最后 N 秒的调用结果。
断路器配置:
resilience4j.circuitbreaker:
instances:
backendA:
// 健康指标参数,非断路器属性
registerHealthIndicator: true
slidingWindowSize: 100
配置属性 | 默认值 | 描述 |
---|---|---|
slidingWindowSize | 100 | 记录断路器关闭状态下(可以访问的情况下)的调用的滑动窗口大小 |
failureRateThreshold | 50(百分比) | 当失败比例超过 failureRateThreshold 的时候,断路器会打开,并开始短路呼叫 |
slowCallDurationThreshold | 60000【ms】 | 请求被定义为慢请求的阈值 |
slowCallRateThreshold | 100(百分比) | 慢请求百分比大于等于该值时,打开断路器开关 |
permittedNumberOfCalls | 10 | 半开状态下允许通过的请求数 |
maxWaitDurationInHalfOpenState | 0 | 配置最大等待持续时间,该持续时间控制断路器在切换到打开之前可以保持在半开状态的最长时间。 |
值 0 表示断路器将在 HalfOpen 状态下无限等待,直到所有允许的调用都已完成。
2.5 壁仓 bulkhead
resilience4j 提供了两种实现壁仓的方法:
SemaphoreBulkhead
使用 Semaphore 实现FixedThreadPoolBulkhead
使用有界队列和固定线程池实现
resilience4j.bulkhead:
instances:
backendA:
maxConcurrentCalls: 10
backendB:
maxWaitDuration: 10ms
maxConcurrentCalls: 20
resilience4j.thread-pool-bulkhead:
instances:
backendC:
maxThreadPoolSize: 1
coreThreadPoolSize: 1
queueCapacity: 1
2.5.1 SemaphoreBulkhead
配置属性 | 默认值 | 描述 |
---|---|---|
maxConcurrentCalls | 25 | 允许的并发执行的数量 |
maxWaitDuration | 0 | 尝试进入饱和隔板时线程应被阻止的最长时间 |
2.5.2 FixedThreadPoolBulkhead
配置属性 | 默认值 | 描述 |
---|---|---|
maxThreadPoolSize | Runtime.getRuntime().availableProcessors() | 线程池最大线程个数 |
coreThreadPoolSize | Runtime.getRuntime().availableProcessors()-1 | 线程池核心线程个数 |
queueCapacity | 100 | 线程池队列容量 |
keepAliveDuration | 20【ms】 | 线程数超过核心线程数之后,空余线程在终止之前等待的最长时间 |
3 使用
3.1 配置
在 application.yml 文件中添加以下 resilience4j 配置:
resilience4j.circuitbreaker:
instances:
backendA:
registerHealthIndicator: true
slidingWindowSize: 100
resilience4j.retry:
instances:
backendA:
maxAttempts: 3
waitDuration: 10s
enableExponentialBackoff: true
exponentialBackoffMultiplier: 2
retryExceptions:
- org.springframework.web.client.HttpServerErrorException
- java.io.IOException
backendB:
maxAttempts: 3
waitDuration: 10s
retryExceptions:
- org.springframework.web.client.HttpServerErrorException
- java.io.IOException
resilience4j.bulkhead:
instances:
backendA:
maxConcurrentCalls: 10
backendB:
maxWaitDuration: 10ms
maxConcurrentCalls: 20
resilience4j.thread-pool-bulkhead:
instances:
backendC:
maxThreadPoolSize: 1
coreThreadPoolSize: 1
queueCapacity: 1
resilience4j.ratelimiter:
instances:
backendA:
limitForPeriod: 10
limitRefreshPeriod: 1s
timeoutDuration: 10ms
registerHealthIndicator: true
eventConsumerBufferSize: 100
backendB:
limitForPeriod: 6
limitRefreshPeriod: 500ms
timeoutDuration: 3s
resilience4j.timelimiter:
instances:
backendA:
timeoutDuration: 2s
cancelRunningFuture: true
backendB:
timeoutDuration: 1s
cancelRunningFuture: false
3.2 使用注解实现
直接在需要限流的方法上增加注解@RateLimiter
实现限流;增加注解@Retry
实现重试;增加注解 @CircuitBreaker
熔断;增加注解 @Bulkhead
实现壁仓。name 属性中分别填写限流器、重试、熔断、壁仓组件的名字。
@Bulkhead(name = "backendA")
@CircuitBreaker(name = "backendA")
@Retry(name = "backendA")
@RateLimiter(name = "backendA")
public Mono<List<User>> list() {
long startTime = System.currentTimeMillis();
return Mono.fromSupplier(() -> {
return userRepository.findAll();
}).doOnError(e -> {
// 打印异常日志&增加监控(自行处理)
logger.error("list.user.error, e", e);
})
.doFinally(e -> {
// 耗时 & 整体健康
logger.info("list.user.time={}, ", System.currentTimeMillis() - startTime);
});
}
@Bulkhead(name = "backendA")
@CircuitBreaker(name = "backendA")//最多支持10个并发量
@Retry(name = "backendA")//使用 backendA 重试器,如果抛出 IOException 会重试三次。
@RateLimiter(name = "backendA")// 限流 10 Qps
public Mono<Boolean> save(User user) {
long startTime = System.currentTimeMillis();
return Mono.fromSupplier(() -> {
return userRepository.save(user) != null;
})
.doOnError(e -> {
// 打印异常日志&增加监控(自行处理)
logger.error("save.user.error, user={}, e", user, e);
})
.doFinally(e -> {
// 耗时 & 整体健康
logger.info("save.user.time={}, user={}", user, System.currentTimeMillis() - startTime);
});
}
注意:以上所有组件,都支持自定义。
来源:https://segmentfault.com/a/1190000042514542


猜你喜欢
- 前言任何一个服务如果没有监控,那就是两眼一抹黑,无法知道当前服务的运行情况,也就无法对可能出现的异常状况进行很好的处理,所以对任意一个服务来
- 一、什么是JSONJSON(JavaScript Object Notation)是一种基于JavaScript语法子集的开放标准数据交换格
- Win32 APIWin32 API即为Microsoft 32位平台的应用程序编程接口(Application Programming I
- 1、JavaBean介绍 * JavaBean的定义:JavaBeans是Java中一种特殊的类,可以将多个对象封装到一个对象(bean)
- 一、前言Android 实现卫星式菜单也叫弧形菜单,主要要做的工作如下:1.动画的处理2.自定义ViewGroup来实现卫星式菜单View(
- 摘要在使用java做后台站点的开发张,图表和报表功能都是不可或缺 的。本文推荐了8款最精彩实用的Java图表应用,大部分图表应用的功能都类似
- Android 设置颜色的方法总结Android中有几种设置界面背景及文字颜色的方法,下面由浅入深分别介绍Android中设置颜色的几种方法
- 我就废话不多说了,大家还是直接看代码吧~/* *es配置类 * */@Configurationpublic class ElasticSe
- 当只需要两个图像合并的时候,可以简单的使用gdi+,把两个图像画到一个画布上面实现合并bitmap.当需要将许多bitmap合并时,由于bi
- 加载yml配置文件的no字段自动转义项目上线了才发现一个字段被转义了,如下图:本来应该会拿到no字段和数据进行比对的,结果发现比对完的数据这
- 最近在做Android 应用开发,IDE是android studio , 使用的版本配置如下:compileSdk 32bui
- 以前在公司做项目的时候,遇到了分辨率的适配问题,说起来当时挺纠结的,因为没有外网,所以这个问题,都是黑暗中摸索的,尝试了许多方法,最后和徒弟
- AsyncTask,顾名思义,异步任务。说到异步,最简单的理解就是不同步。再复杂一点理解,就得举例子了。假设我要去火车站买票,刚到火车站我突
- 本文实例为大家分享了Java Web实现简易图书管理系统的具体代码,供大家参考,具体内容如下前言首先实现的是用户的登录注册,注册成功后自动跳
- Java 中的引用类型:强引用、软引用、弱引用和虚引用强引用如 Object object = new Object(),那 object
- 本文实例讲述了C#使用WebClient登录网站并抓取登录后的网页信息实现方法。分享给大家供大家参考,具体如下:C#登录网站实际上就是模拟浏
- private void btnSave_Click(object sender, RoutedEventArgs e)
- using System;using System.Collections.Generic;using System.ComponentMo
- Android存储方式有很多种,在这里所用的存储方式是SharedPreferrences,其采用了Map数据结构来存储数据,以键值的方式存
- 一、首先在Application的onCreate中写:// GeneralAppliction.javapublic static IWX