使用@TransactionalEventListener监听事务教程
作者:shiliang_feng 发布时间:2023-10-05 02:50:44
@TransactionalEventListener监听事务
项目背景
最近在项目遇到一个问题
A方法体内有 INSERT、UPDATE或者DELETE操作,最后会发送一段MQ给外部,外部接收到MQ后会再发送一段请求过来,系统收到请求后会执行B方法,B方法会依赖A方法修改后的结果,这就有一个问题,如果A方法事务没有提交;且B方法的请求过来了会查询到事务未提交前的状态,这就会有问题
解决办法:@TransactionalEventListener
在Spring4.2+,有一种叫做TransactionEventListener的方式,能够控制在事务的时候Event事件的处理方式。 我们知道,Spring的发布订阅模型实际上并不是异步的,而是同步的来将代码进行解耦。而TransactionEventListener仍是通过这种方式,只不过加入了回调的方式来解决,这样就能够在事务进行Commited,Rollback…等的时候才会去进行Event的处理。
具体实现
//创建一个事件类
package com.qk.cas.config;
import org.springframework.context.ApplicationEvent;
public class MyTransactionEvent extends ApplicationEvent {
private static final long serialVersionUID = 1L;
private IProcesser processer;
public MyTransactionEvent(IProcesser processer) {
super(processer);
this.processer = processer;
}
public IProcesser getProcesser() {
return this.processer;
}
@FunctionalInterface
public interface IProcesser {
void handle();
}
}
//创建一个监听类
package com.qk.cas.config;
import org.springframework.stereotype.Component;
import org.springframework.transaction.event.TransactionPhase;
import org.springframework.transaction.event.TransactionalEventListener;
@Component
public class MyTransactionListener {
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void hanldeOrderCreatedEvent(MyTransactionEvent event) {
event.getProcesser().handle();
}
}
//MQ方法的变动
@Autowired
private ApplicationEventPublisher eventPublisher;
@Autowired
private AmqpTemplate rabbitTemplate;
public void sendCreditResult(String applyNo, String jsonString) {
eventPublisher.publishEvent(new MyTransactionEvent(() -> {
LOGGER.info("MQ。APPLY_NO:[{}]。KEY:[{}]。通知报文:[{}]", applyNo, Queues.CREDIT_RESULT, jsonString);
rabbitTemplate.convertAndSend(Queues.CREDIT_RESULT, jsonString);
}));
}
拓展
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT) 只有当前事务提交之后,才会执行事件监听的方法,其中参数phase默认为AFTER_COMMIT,共有四个枚举:
public enum TransactionPhase {
/**
* Fire the event before transaction commit.
* @see TransactionSynchronization#beforeCommit(boolean)
*/
BEFORE_COMMIT,
/**
* Fire the event after the commit has completed successfully.
* <p>Note: This is a specialization of {@link #AFTER_COMPLETION} and
* therefore executes in the same after-completion sequence of events,
* (and not in {@link TransactionSynchronization#afterCommit()}).
* @see TransactionSynchronization#afterCompletion(int)
* @see TransactionSynchronization#STATUS_COMMITTED
*/
AFTER_COMMIT,
/**
* Fire the event if the transaction has rolled back.
* <p>Note: This is a specialization of {@link #AFTER_COMPLETION} and
* therefore executes in the same after-completion sequence of events.
* @see TransactionSynchronization#afterCompletion(int)
* @see TransactionSynchronization#STATUS_ROLLED_BACK
*/
AFTER_ROLLBACK,
/**
* Fire the event after the transaction has completed.
* <p>For more fine-grained events, use {@link #AFTER_COMMIT} or
* {@link #AFTER_ROLLBACK} to intercept transaction commit
* or rollback, respectively.
* @see TransactionSynchronization#afterCompletion(int)
*/
AFTER_COMPLETION
}
注解@TransactionalEventListener
例如 用户注册之后需要计算用户的邀请关系,递归操作。如果注册的时候包含多步验证,生成基本初始化数据,这时候我们通过mq发送消息来处理这个邀请关系,会出现一个问题,就是用户还没注册数据还没入库,邀请关系就开始执行,但是查不到数据,导致出错。
@TransactionalEventListener 可以实现事务的监听,可以在提交之后再进行操作。
监听的对象
package com.jinglitong.springshop.interceptor;
import com.jinglitong.springshop.entity.Customer;
import org.springframework.context.ApplicationEvent;
public class RegCustomerEvent extends ApplicationEvent{
public RegCustomerEvent(Customer customer){
super(customer);
}
}
监听到之后的操作
package com.jinglitong.springshop.interceptor;
import com.alibaba.fastjson.JSON;
import com.jinglitong.springshop.entity.Customer;
import com.jinglitong.springshop.entity.MqMessageRecord;
import com.jinglitong.springshop.servcie.MqMessageRecordService;
import com.jinglitong.springshop.util.AliMQServiceUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.transaction.event.TransactionPhase;
import org.springframework.transaction.event.TransactionalEventListener;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
@Component
@Slf4j
public class RegCustomerListener {
@Value("${aliyun.mq.order.topic}")
private String topic;
@Value("${aliyun.mq.regist.product}")
private String registGroup;
@Value("${aliyun.mq.regist.tag}")
private String registTag;
@Autowired
MqMessageRecordService mqMessageRecordService;
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void hanldeRegCustomerEvent(RegCustomerEvent regCustomerEvent) {
Customer cust = (Customer) regCustomerEvent.getSource();
Map<String, String> map = new HashMap<String, String>();
map.put("custId", cust.getZid());
map.put("account", cust.getAccount());
log.info("put regist notice to Mq start");
String hdResult = AliMQServiceUtil.createNewOrder(cust.getZid(), JSON.toJSONString(map),topic,registTag,registGroup);
MqMessageRecord insert = buidBean(cust.getZid(),hdResult,registTag,JSON.toJSONString(map),registGroup);
if(StringUtils.isEmpty(hdResult)) {
insert.setStatus(false);
}else {
insert.setStatus(true);
}
mqMessageRecordService.insertRecord(insert);
log.info("put regist notice to Mq end");
log.info("regist notice userId : " + cust.getAccount());
}
private MqMessageRecord buidBean (String custId,String result ,String tag,String jsonStr,String groupId) {
MqMessageRecord msg = new MqMessageRecord();
msg.setFlowId(custId);
msg.setGroupName(groupId);
msg.setTopic(topic);
msg.setTag(tag);
msg.setMsgId(result);
msg.setDataBody(jsonStr);
msg.setSendType(3);
msg.setGroupType(1);
msg.setCreateTime(new Date());
return msg;
}
}
@Autowired
private ApplicationEventPublisher applicationEventPublisher;
applicationEventPublisher.publishEvent(new RegCustomerEvent (XXX));
这样可以确保数据入库之后再进行异步计算
来源:https://my.oschina.net/u/4021946/blog/4915478


猜你喜欢
- 附GitHub源码:WebViewExplore一、WebView的基础配置WebSettings ws = getSettings();w
- RestAPI中, 经常需要操作json字符串, 需要把json字符串"反序列化"成一个对象, 也需要把一个
- 一、前言学习概述:学习四种不同类型的方法应用、方法被调用时的内存图、重载学习目标:熟练掌握方法的应用以及重载二、定义与调用1.概述定义:方法
- 简介最近学了java基础后对以前不会写的作业深有感触,想起以前各种在网上找资料找别人的代码参考,所以今天特地写了了简单的基于控制台的学生信息
- 前言本文尝试从What、Why、How这三个角度来探索Java中的弱引用,帮助大家理解Java中弱引用的定义、基本使用场景和使用方法。一、&
- 今天查看登录日志,发现http_x_forwarded_for获取到的ip地址有些是内网ip地址,有些则是公网和内网ip地址一起获取到,用逗
- 最大单词长度乘积给你一个字符串数组 words ,找出并返回 length(words[i]) * length(words[j]
- import java.io.BufferedReader;import java.io.File;import java.io.FileI
- 一、什么是JSONJSON(JavaScript Object Notation)是一种基于JavaScript语法子集的开放标准数据交换格
- 本文实例为大家分享了Android简单实现天气预报App的具体代码,供大家参考,具体内容如下一、UI设计首页UI<?xml versi
- 一、概述SPI(Service Provider Interface),是Java内置的一种服务提供发现机制,可以用来提高框架的扩展性,主要
- Process#waitFor()阻塞问题有时需要在程序中调用可执行程序或脚本命令:Process process = Runtime.ge
- 对流进行操作时要引用 using System.IO; 命名空间 FileStream常用的属性和方法:属性:CanRead 判断当前流是否
- 一、File类的概述和构造方法public class Fileextends Objectimplements Serializable,
- launch我们经常用,今天来看看它是什么原理。建议: 食用本篇文章之前记得先食用Kotlin协程之createCoroutine和star
- 本文参考文档Add Flutter to existing apps。首先有一个可以运行的原生项目第一步:新建Flutter moduleT
- 概述在Compose中,图片组件主要有两种,分别是显示图标的Icon组件和显示图片的Image组件,当我们显示一系列的小图标的时候,我们可以
- 这几年一直在做手机上和电视盒的App,几乎没有考虑过横竖屏切换的问题。电视盒好说,横屏不变,你要是给它设计个竖屏人家也没机会使;而手机上的应
- ViewPager是一个常用的Android组件,不过通常我们使用ViewPager的时候不能实现左右无限循环滑动,在滑到边界的时候会看到一
- github开源项目(Zxing)demo最快的调用Zxing方法1.关联第三方库2.调用基础的扫码3.获取返回值具体代码如下://1.默认