关于springboot响应式编程整合webFlux的问题
作者:pshdhx_albert 发布时间:2023-12-07 07:25:55
在servlet3.0标准之前,是每一个请求对应一个线程。如果此时一个线程出现了高延迟,就会产生阻塞问题,从而导致整个服务出现严重的性能情况,因为一旦要调用第三方接口,就有可能出现这样的操作了。早期的处理方式只能是手工控制线程。
在servlet3.0标准之后,为了解决此类问题,所以提供了异步响应的支持。在异步响应处理结构中,可以将耗时操作的部分交由一个专属的异步线程进行响应处理,同时请求的线程资源将被释放,并将该线程返回到线程池中,以供其他用户使用,这样的操作机制将极大的提升程序的并发性能。
对于以上给出的响应式编程支持,仅仅是一些原生的支持模式,而现在既然基于springboot程序开发,那么就需要考虑一些更简单的整合。
而在spring中实现响应式编程,那么则需要使用到spring webFlux,该组件是一个重新构建的且基于Reactive Streams标准实现的异步非阻塞Web开发框架,以Reactor开发框架为基础,可以更加容易实现高并发访问下的请求处理模型。在springboot2.x版本中提供了webFlux依赖模块,该模块有两种模型实现:一种是基于功能性端点的方式,另一种是基于SpringMVC注解方式。
Maven引入
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
整合处理器:
package com.example.oldguy.myWebFlux.handler;
import com.example.oldguy.myVo.Message;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Mono;
@Component
@Slf4j
public class MessageHandler {
public Mono<Message> echoHandler(Message message){
log.info("【{}】业务层接收处理数据:{}",Thread.currentThread().getName());
message.setTitle("【】"+Thread.currentThread().getName()+"】"+message.getTitle());
message.setContent("【】"+Thread.currentThread().getName()+"】"+message.getContent());
return Mono.create(item->item.success(message)); //实现数据响应
}
}
整合控制器:
package com.example.oldguy.myController;
import com.example.oldguy.myVo.Message;
import com.example.oldguy.myWebFlux.handler.MessageHandler;
import com.example.oldguy.mytask.MyThreadTask;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.web.bind.WebDataBinder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.InitBinder;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;
import org.springframework.web.context.request.async.DeferredResult;
import javax.servlet.http.HttpServletRequest;
import java.beans.PropertyEditorSupport;
import java.time.Instant;
import java.time.LocalDate;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Date;
import java.util.concurrent.TimeUnit;
/**
* 异步线程的处理机制
*/
@RestController
@RequestMapping("/message/*")
@Slf4j
@Api(tags = "异步处理")
public class AsyncController {
@Autowired
private ThreadPoolTaskExecutor threadPoolTaskExecutor;
private MyThreadTask task;
private MessageHandler messageHandler;
/**
* 日期转换
* @param
* @return
*/
private static final DateTimeFormatter LOCAL_DATE_FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd");
@InitBinder
public void initBinder(WebDataBinder binder){
binder.registerCustomEditor(Date.class,new PropertyEditorSupport(){
@Override
public void setAsText(String text) throws IllegalArgumentException {
LocalDate localDate = LocalDate.parse(text,LOCAL_DATE_FORMAT);
Instant instant = localDate.atStartOfDay().atZone(ZoneId.systemDefault()).toInstant();
super.setValue(Date.from(instant));
}
});
}
@GetMapping("runnable")
@ApiOperation("异常处理Runnable")
public Object message(String message) {
log.info("外部线程:{}", Thread.currentThread().getName());
HttpServletRequest request = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getRequest();
DeferredResult<String> result = new DeferredResult<>(6000L); //设置异步响应
this.threadPoolTaskExecutor.execute(new Runnable() { //线程核心任务
@SneakyThrows
public void run() {
log.info("内部线程:{}",Thread.currentThread().getName());
TimeUnit.SECONDS.sleep(7);
result.setResult("[echo]"+message); //执行最终的响应
result.onCompletion(new Runnable() { //完成处理线程
log.info("完成线程:{}",Thread.currentThread().getName()); //日志输出
result.onTimeout(new Runnable() {
log.info("超时线程:{}",Thread.currentThread().getName());
result.setResult("【请求超时】"+request.getRequestURI()); //超时路径
return result;
@GetMapping("task")
@ApiOperation("task异步任务开启")
public Object messageTask(String message){
log.info("外部线程{}",Thread.currentThread().getName());
this.task.startTaskHander();
return "【echo】"+message;
@GetMapping("webflux")
@ApiOperation("整合webflux")
public Object echo(Message message){
log.info("接收用户信息,用户方发送的参数为message={}",message);
return this.messageHandler.echoHandler(message);
}
页面响应:
控制台响应:
2021-11-30 15:04:06.946 INFO 22884 --- [nio-1999-exec-1] c.e.oldguy.myController.AsyncController : 接收用户信息,用户方发送的参数为message=Message(title=pansd, pubdate=Tue Nov 30 00:00:00 CST 2021, content=come on baby)
2021-11-30 15:04:06.947 INFO 22884 --- [nio-1999-exec-1] c.e.o.myWebFlux.handler.MessageHandler : 【http-nio-1999-exec-1】业务层接收处理数据:Message(title=pansd, pubdate=Tue Nov 30 00:00:00 CST 2021, content=come on baby)
webFlux响应map和List
//webFlux响应集合
public Flux<Message> list(Message message){
List<Message> messageList = new ArrayList<>();
for(int i=0;i<10;i++){
Message m = new Message();
m.setTitle(i+"--"+message.getTitle());
m.setContent(i+"--"+message.getContent());
m.setPubdate(message.getPubdate());
messageList.add(m);
}
return Flux.fromIterable(messageList);
}
public Flux<Map.Entry<String,Message>> map(Message message){
Map<String,Message> map = new HashMap<>();
for(int i=0;i<10;i++){
Message m = new Message();
m.setTitle(i+"--"+message.getTitle());
m.setContent(i+"--"+message.getContent());
m.setPubdate(message.getPubdate());
map.put("pansd-"+i,m);
}
// Set<Map.Entry<String, Message>> entries = map.entrySet();
return Flux.fromIterable(map.entrySet());
}
@GetMapping("webfluxList")
@ApiOperation("整合webfluxList")
public Object echoList(Message message){
log.info("接收用户信息,用户方发送的参数为message={}",message);
return this.messageHandler.list(message);
}
@GetMapping("webfluxMap")
@ApiOperation("整合webfluxMap")
public Object echoMap(Message message){
log.info("接收用户信息,用户方发送的参数为message={}",message);
return this.messageHandler.map(message);
}
来源:https://blog.csdn.net/pshdhx/article/details/121636277


猜你喜欢
- 定时器问题定时器属于基本的基础组件,不管是用户空间的程序开发,还是内核空间的程序开发,很多时候都需要有定时器作为基础组件的支持。一个定时器的
- 本文实例为大家分享了C#实现简单的计算器功能的具体代码,供大家参考,具体内容如下环境:VS2010及以上版本1、建立个Window窗体应用2
- MD5加密简介哈希算法又称散列算法,是将任何数据转换成固定长度的算法的统称。 从本质上讲,MD5也是一种哈希算法,其输出是生成12
- 默认日志 Logback :默认情况下,Spring Boot会用Logback来记录日志,并用INFO级别输出到控制台。在运行应用程序和其
- Java是一门天然的面向对象的语言。而所有我们手动创造出来的类,都继承于同一个类,即Object类。可以看一下Object类的结构nativ
- 一.内部类的介绍 内部类: 一个类定义在 另一个类 的 内部。 &
- springboot2.x暴露健康状况通过prometheus监控加入依赖 <!--prometheus监控 https://prom
- 本文实例讲述了Java swing框架实现的贪吃蛇游戏。分享给大家供大家参考,具体如下:java是门高级语言,做游戏时适合做后台,但是用它也
- 1)首先启动hadoop2个进程,进入hadoop/sbin目录下,依次启动如下命令[root@node02 sbin]# pwd/usr/
- 背景我在准备使用 JVM 的命令时候观察程序的动态,但是发现 Main 函数启动就退出了,所以也没办法直接观察,于是想到了如何让 Main
- 我们知道Android手机操作系统采用的是Linux内核,Linux中最高的系统权限就是Root,这就类似与Windows中的Adminis
- 【1】阻塞队列一、什么是阻塞队列?① 支持阻塞的插入方法:意思是当队列满时,队列会阻塞插入元素的线程,直到队列不满。② 支持阻塞的移除方法:
- 本文主要介绍Java中的rmi的基本使用1:项目架构api:主要是接口的定义,url地址,端口号rmiconsumer:rmi服务的调用者r
- 使用Socket实现多人聊天应用,供大家参考,具体内容如下 在讲scoket通信器先可以先了解一下网络协议手机能够使用联网功能是因
- 1、设置ssh安装ssh相关软件包:sudo apt-get install openssh-client openssh-server然后
- 内存分配方式简介在C++中,内存分成5个区,他们分别是堆、栈、自由存储区、全局/静态存储区和常量存储区。栈:在执行函数时,函数内局部变量的存
- 前言工作中你可能会遇到很多这样的场景,一个接口,要从其他几个service调用查询方法,分别获取到需要的值之后再封装数据返回。还可能在微服务
- 1、将 Jmeter 下 extras 目录中 ant-jmeter-1.1.1.jar 包拷贝至 ant 安装目录下的lib目录中,否则会
- 本文实例为大家分享了用JavaMail发送HTML模板邮件的具体代码,供大家参考,具体内容如下依赖<dependency>&nb
- 之前使用Retrofit都是将JSON串转化为POJO对象,针对不同的业务协议,定义相应的接口和参数列表。但是此种方式一般用在自己内部协议基