并行Stream与Spring事务相遇会发生什么?
作者:??程序新视界???? 发布时间:2022-08-28 15:40:12
前言:
事情是这样的:运营人员反馈,通过Excel导入数据时,有一部分成功了,有一部分未导入。初步猜测,是事务未生效导致的。查看代码,发现导入部分已经通过@Transcational注解进行事务控制了,为什么还会出现事务不生效的问题呢?下面我们就进行具体的案例分析,Let's go!
事务不生效的代码
这里写一段简单的伪代码来演示展示一下事务不生效的代码:
@Transactional(rollbackFor = Exception.class)
public void batchInsert(List<Order> list) {
list.parallelStream().forEach(order -> orderMapper.save(order));
}
逻辑很简单,遍历list,然后批量插入Order数据到数据库。在该方法上使用@Transactional来声明出现异常时进行回滚。
但事实情况是,其中某一条数据执行异常时,事务并没有进行回滚。这到底是为什么呢?
下面一探究竟。
JDK 8 的Stream
上面代码中涉及到了两个知识点:parallelStream和@Transactional,我们先来铺垫一下parallelStream相关知识。
在JDK8 中引入了Stream API的概念和实现,这里的Stream有别于 InputStream 和OutputStream,Stream API 是处理对象流而不是字节流。
比如,我们可以通过如下方式来基于Stream进行实现:
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
numbers.stream().forEach(num->System.out.println(num));
输出:1 2 3 4 5 6 7 8 9
代码看起来方便清爽多了。
关于Stream的基本处理流程如下:
在这些Stream API中,还提供了一个并行处理的API,也就是parallelStream。它可以将任务拆分子任务,分发给多个处理器同时处理,之后合并。这样做的目的很明显是为了提升处理效率。
parallelStream的基本使用方式如下:
// 并行执行流
list.stream().parallel().filter(e -> e > 10).count()
针对上述代码,对应的流程如下:
而parallelStream会将流划分成多个子流,分散到不同的CPU并行处理,然后合并处理结果。其中,parallelStream默认是基于ForkJoinPool.commonPool()线程池来实现并行处理的。
通常情况下,我们可以认为并行会比串行快,但还是有前提条件的:
处理器核心数量:并行处理核心数越多,处理效率越高;
处理数据量:处理数据量越大优势越明显;
但并行处理也面临着一系列的问题,比如:资源竞争、死锁、线程切换、事务、可见性、线程安全等问题。
@Transactional事务处理
上面了解了parallelStream的基本原理及特性之后,再来看看@Transactional的事务处理特性。
@Transactional是Spring提供的基于注解的一种声明式事务方式,该注解只能运用到public的方法上。
基本原理:当一个方法被@Transactional注解之后,Spring会基于AOP在方法执行之前开启一个事务。当方法执行完毕之后,根据方法是否报错,来决定回滚或提交事务。
在默认代理模式下,只有目标方法由外部方法调用时,才能被Spring的事务 * 拦截。所以,在同一个类中的两个方法直接调用,不会被Spring的事务 * 拦截。这是事务不生效的一个场景,但在上述案例中,并不存在这种情况。
Spring在处理事务时,会从连接池中获得一个jdbc connection,将连接绑定到线程上(基于ThreadLocal),那么同一个线程中用到的就是同一个connection了。具体实现在DataSourceTransactionManager#doBegin方法中。
Bug综合分析
在了解了parallelStream和@Transactional的相关知识之后,我们会发现:parallelStream处理时开启了多线程,而@Transactional在处理事务时会(基于ThreadLocal)将连接绑定到当前线程,由于@Transactional绑定管理的是主线程的事务,而parallelStream开启的新的线程与主线程无关。因此,事务也就无效了。
此时,将parallelStream改为普通的stream,事务可正常回滚。这就提示我们,在使用基于@Transactional方式管理事务时,慎重使用多线程处理。
问题拓展
虽然parallelStream带来了更高的性能,但也要区分场景进行使用。即便是在不需要事务管理的情况下,如果parallelStream使用不当,也会造成同一时间对数据库发起大量请求等问题。
因此,在stream与parallelStream之间进行选择时,还要考虑几个问题:
是否需要并行?数据量比较大,处理器核心数比较多的情况下才会有性能提升。
任务之间是否是独立的,是否会引起任何竞态条件?比如:是否共享变量。
执行结果是否取决于任务的调用顺序?并行执行的顺序是不确定的。
小结
文章讲述的Bug虽然简单,但如果不了解parallelStream与@Transactional注解的特性,还是很难排查的。而且也让我们意识到,虽然Spring通过@Transactional将事务管理进行了简化处理,但作为开发者,还是需要深入了解一下它的基本运作原理。不然,在排查bug时,很容易踩坑。
来源:https://juejin.cn/post/7036123712453607454


猜你喜欢
- 消息的可靠投递在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ 为我们提供了两种方式用
- 本文实例为大家分享了Java实现学生管理系统的具体代码,供大家参考,具体内容如下package BookDemo_1; import jav
- 好久没有写文章了,年前公司新开了一个项目,是和usb转串口通信相关的,需求是用安卓平板通过usb转接后与好几个外设进行通信,一直忙到最近,才
- 一、return语句执行顺序finally语句是在return语句执行之后,return语句返回之前执行的package exception
- C#动态webservice调用接口using System;using System.Collections;using System.I
- 前言一般在c++中没有使用的变量会有警告,C#中也有,在QT中我们利用Q_UNSED可以直接消除这些警告,那么我们在C#中该如何做才能消除这
- Web基础和HTTP协议┌─────────┐┌─────────┐
- 本文实例为大家分享了java获取当前时间年月日的具体代码,供大家参考,具体内容如下import java.text.ParseExcepti
- 在项目中有事需要对值为NULL的对象中Field不做序列化输入配置方式如下:[配置类型]:源码包中的枚举类:public static en
- Java 线程对比Thread,Runnable,Callablejava 使用 Thread 类代表线程,所有现场对象都必须是 Threa
- 1.实现阴影或模糊边效果方式:2.通过shape来实现,具体是通过layer-list 多层叠放的方式实现的<?xml version
- 一、原理区别:Java * 是利用反射机制生成一个实现代理接口的匿名类,在调用具体方法前调用InvokeHandler来处理。而cglib
- Java中字符串中子串的查找共有四种方法,如下:1、int indexOf(String str) :返回第一次出现的指定子字符串在此字符串
- 1 ArrayList在集合框架中,ArrayList是一个普通的类,实现了List接口,具体框架图如下:说明:ArrayList实现了Ra
- 在上一篇文章中,我们之前对BarChart.lerp的定义并不是高效的,我们正在创建的Bar实例,仅作为Bar.lerp的参数给出,并且针对
- 使用方法:先把mvcpager.dll引用加入mvc项目中。前台代码前台:@{Layout = null;}@using Webdiyer.
- 学了Android有一段时间了,一直没有时间写博客,趁着周末有点空,就把自己做的一些东西写下来. 一方面锻炼一下自己的写文档的能力,另一方面
- 一、问题说明偶然换了下spring boot的版本号,结果idea一直标红,报该父依赖一直找不到。但是当我查看引入的依赖时,版本号已经变成2
- 下面以launch方法为例进行分析。一.协程的创建launch方法的代码如下:// CoroutineScope的扩展方法public fu
- 本文实例为大家分享了android通过NFC读取卡号的具体代码,供大家参考,具体内容如下1.获取权限<uses-permission