Java @GlobalLock注解详细分析讲解
作者:氵奄不死的鱼 发布时间:2023-06-03 03:55:53
GlobalLock的作用
对于某条数据进行更新操作,如果全局事务正在进行,当某个本地事务需要更新该数据时,需要使用@GlobalLock确保其不会对全局事务正在操作的数据进行修改。防止的本地事务对全局事务的数据脏写。如果和select for update组合使用,还可以起到防止脏读的效果。
全局锁
首先我们知道,seata的AT模式是二段提交的,而且AT模式能够做到事务ACID四种特性中的A原子性和D持久性,默认情况下隔离级别也只能保证在读未提交
那么为了保证原子性,在全局事务未提交之前,其中被修改的数据会被加上全局锁,保证不再会被其他全局事务修改。
为什么要使用GlobalLock
但是全局锁仅仅能防止全局事务对一个上锁的数据再次进行修改,在很多业务场景中我们是没有跨系统的rpc调用的,通常是不会加分布式事务的。
例如有分布式事务执行完毕A系统的业务逻辑,正在继续执行B系统逻辑,并且A系统事务已经提交。此时A系统一个本地的spring事务去与分布式事务修改同一行数据,是可以正常修改的
由于本地的spring事务并不受seata的全局锁控制容易导致脏写,即全局事务修改数据后,还未提交,数据又被本地事务改掉了。这很容易发生数据出错的问题,而且十分有可能导致全局事务回滚时发现 数据已经dirty(与uodoLog中的beforeImage不同)。那么就会回滚失败,进而导致全局锁无法释放,后续的操作无法进行下去。也是比较严重的问题。
一种解决办法就是,针对所有相关操作都加上AT全局事务,但这显然是没必要的,因为全局事务意味者需要与seata-server进行通信,创建全局事务,注册分支事务,记录undoLog,判断锁冲突,注册锁。
那么对于不需要跨系统,跨库的的业务来说,使用GlobalTransactional实在是有点浪费了
那么更加轻量的GlobalLock就能够发挥作用了,其只需要判断本地的修改是否与全局锁冲突就够了
工作原理
加上@GlobalLock之后,会进入切面
io.seata.spring.annotation.GlobalTransactionalInterceptor#invoke
进而进入这个方法,处理全局锁
Object handleGlobalLock(final MethodInvocation methodInvocation,
final GlobalLock globalLockAnno) throws Throwable {
return globalLockTemplate.execute(new GlobalLockExecutor() {
@Override
public Object execute() throws Throwable {
return methodInvocation.proceed();
}
@Override
public GlobalLockConfig getGlobalLockConfig() {
GlobalLockConfig config = new GlobalLockConfig();
config.setLockRetryInternal(globalLockAnno.lockRetryInternal());
config.setLockRetryTimes(globalLockAnno.lockRetryTimes());
return config;
}
});
}
进入execute方法
public Object execute(GlobalLockExecutor executor) throws Throwable {
boolean alreadyInGlobalLock = RootContext.requireGlobalLock();
if (!alreadyInGlobalLock) {
RootContext.bindGlobalLockFlag();
}
// set my config to config holder so that it can be access in further execution
// for example, LockRetryController can access it with config holder
GlobalLockConfig myConfig = executor.getGlobalLockConfig();
GlobalLockConfig previousConfig = GlobalLockConfigHolder.setAndReturnPrevious(myConfig);
try {
return executor.execute();
} finally {
// only unbind when this is the root caller.
// otherwise, the outer caller would lose global lock flag
if (!alreadyInGlobalLock) {
RootContext.unbindGlobalLockFlag();
}
// if previous config is not null, we need to set it back
// so that the outer logic can still use their config
if (previousConfig != null) {
GlobalLockConfigHolder.setAndReturnPrevious(previousConfig);
} else {
GlobalLockConfigHolder.remove();
}
}
}
}
先判断当前是否已经在globalLock范围之内,如果已经在范围之内,那么把上层的配置取出来,用新的配置替换,并在方法执行完毕时候,释放锁,或者将配置替换成之前的上层配置
如果开启全局锁,会在threadLocal put一个标记
//just put something not null
CONTEXT_HOLDER.put(KEY_GLOBAL_LOCK_FLAG, VALUE_GLOBAL_LOCK_FLAG);
开始执行业务方法
那么加上相关GlobalLock标记的和普通方法的区别在哪里?
我们都知道,seata会对数据库连接做代理,在生成PreparedStatement时会进入
io.seata.rm.datasource.AbstractConnectionProxy#prepareStatement(java.lang.String)
@Override
public PreparedStatement prepareStatement(String sql) throws SQLException {
String dbType = getDbType();
// support oracle 10.2+
PreparedStatement targetPreparedStatement = null;
if (BranchType.AT == RootContext.getBranchType()) {
List<SQLRecognizer> sqlRecognizers = SQLVisitorFactory.get(sql, dbType);
if (sqlRecognizers != null && sqlRecognizers.size() == 1) {
SQLRecognizer sqlRecognizer = sqlRecognizers.get(0);
if (sqlRecognizer != null && sqlRecognizer.getSQLType() == SQLType.INSERT) {
TableMeta tableMeta = TableMetaCacheFactory.getTableMetaCache(dbType).getTableMeta(getTargetConnection(),
sqlRecognizer.getTableName(), getDataSourceProxy().getResourceId());
String[] pkNameArray = new String[tableMeta.getPrimaryKeyOnlyName().size()];
tableMeta.getPrimaryKeyOnlyName().toArray(pkNameArray);
targetPreparedStatement = getTargetConnection().prepareStatement(sql,pkNameArray);
}
}
}
if (targetPreparedStatement == null) {
targetPreparedStatement = getTargetConnection().prepareStatement(sql);
}
return new PreparedStatementProxy(this, targetPreparedStatement, sql);
}
这里显然不会进入AT模式的逻辑,那么直接通过真正的数据库连接,生成PreparedStatement,再使用PreparedStatementProxy进行包装,代理增强
在使用PreparedStatementProxy执行sql时,会进入seata定义的一些逻辑
public boolean execute() throws SQLException {
return ExecuteTemplate.execute(this, (statement, args) -> statement.execute());
}
最终来到
io.seata.rm.datasource.exec.ExecuteTemplate#execute(java.util.List<io.seata.sqlparser.SQLRecognizer>, io.seata.rm.datasource.StatementProxy, io.seata.rm.datasource.exec.StatementCallback<T,S>, java.lang.Object…)
public static <T, S extends Statement> T execute(List<SQLRecognizer> sqlRecognizers,
StatementProxy<S> statementProxy,
StatementCallback<T, S> statementCallback,
Object... args) throws SQLException {
if (!RootContext.requireGlobalLock() && BranchType.AT != RootContext.getBranchType()) {
// Just work as original statement
return statementCallback.execute(statementProxy.getTargetStatement(), args);
}
String dbType = statementProxy.getConnectionProxy().getDbType();
if (CollectionUtils.isEmpty(sqlRecognizers)) {
sqlRecognizers = SQLVisitorFactory.get(
statementProxy.getTargetSQL(),
dbType);
}
Executor<T> executor;
if (CollectionUtils.isEmpty(sqlRecognizers)) {
executor = new PlainExecutor<>(statementProxy, statementCallback);
} else {
if (sqlRecognizers.size() == 1) {
SQLRecognizer sqlRecognizer = sqlRecognizers.get(0);
switch (sqlRecognizer.getSQLType()) {
case INSERT:
executor = EnhancedServiceLoader.load(InsertExecutor.class, dbType,
new Class[]{StatementProxy.class, StatementCallback.class, SQLRecognizer.class},
new Object[]{statementProxy, statementCallback, sqlRecognizer});
break;
case UPDATE:
executor = new UpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
break;
case DELETE:
executor = new DeleteExecutor<>(statementProxy, statementCallback, sqlRecognizer);
break;
case SELECT_FOR_UPDATE:
executor = new SelectForUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
break;
default:
executor = new PlainExecutor<>(statementProxy, statementCallback);
break;
}
} else {
executor = new MultiExecutor<>(statementProxy, statementCallback, sqlRecognizers);
}
}
T rs;
try {
rs = executor.execute(args);
} catch (Throwable ex) {
if (!(ex instanceof SQLException)) {
// Turn other exception into SQLException
ex = new SQLException(ex);
}
throw (SQLException) ex;
}
return rs;
}
如果当前线程不需要锁并且不不在AT模式的分支事务下,直接使用原生的preparedStatement执行就好了
这里四种操作,通过不同的接口去执行,接口又有多种不同的数据库类型实现
插入分为不同的数据库类型,通过spi获取
seata提供了三种数据库的实现,
update,delete,select三种没有多个实现类
他们在执行时都会执行父类的方法
io.seata.rm.datasource.exec.AbstractDMLBaseExecutor#executeAutoCommitTrue
protected T executeAutoCommitTrue(Object[] args) throws Throwable {
ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
try {
connectionProxy.changeAutoCommit();
return new LockRetryPolicy(connectionProxy).execute(() -> {
T result = executeAutoCommitFalse(args);
connectionProxy.commit();
return result;
});
} catch (Exception e) {
// when exception occur in finally,this exception will lost, so just print it here
LOGGER.error("execute executeAutoCommitTrue error:{}", e.getMessage(), e);
if (!LockRetryPolicy.isLockRetryPolicyBranchRollbackOnConflict()) {
connectionProxy.getTargetConnection().rollback();
}
throw e;
} finally {
connectionProxy.getContext().reset();
connectionProxy.setAutoCommit(true);
}
}
全局锁的策略, 是在一个while(true)循环里不断执行
protected <T> T doRetryOnLockConflict(Callable<T> callable) throws Exception {
LockRetryController lockRetryController = new LockRetryController();
while (true) {
try {
return callable.call();
} catch (LockConflictException lockConflict) {
onException(lockConflict);
lockRetryController.sleep(lockConflict);
} catch (Exception e) {
onException(e);
throw e;
}
}
}
如果出现异常是LockConflictException,进入sleep
public void sleep(Exception e) throws LockWaitTimeoutException {
if (--lockRetryTimes < 0) {
throw new LockWaitTimeoutException("Global lock wait timeout", e);
}
try {
Thread.sleep(lockRetryInternal);
} catch (InterruptedException ignore) {
}
}
这两个变量就是@GlobalLock注解的两个配置,一个是重试次数,一个重试之间的间隔时间。
继续就是执行数据库更新操作
io.seata.rm.datasource.exec.AbstractDMLBaseExecutor#executeAutoCommitFalse
发现这里也会生成,undoLog,beforeImage和afterImage,其实想想,在GlobalLock下,是没必要生成undoLog的。但是现有逻辑确实要生成,这个seata后续应该会优化。
protected T executeAutoCommitFalse(Object[] args) throws Exception {
if (!JdbcConstants.MYSQL.equalsIgnoreCase(getDbType()) && isMultiPk()) {
throw new NotSupportYetException("multi pk only support mysql!");
}
TableRecords beforeImage = beforeImage();
T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
TableRecords afterImage = afterImage(beforeImage);
prepareUndoLog(beforeImage, afterImage);
return result;
}
生成beforeImage和aferImage的逻辑也比较简单。分别在执行更新前,查询数据库,和更新后查询数据库
可见记录undoLog是十分影响性能的,查询就多了两次,如果undoLog入库还要再多一次入库操作。
再看prepareUndoLog
protected void prepareUndoLog(TableRecords beforeImage, TableRecords afterImage) throws SQLException {
if (beforeImage.getRows().isEmpty() && afterImage.getRows().isEmpty()) {
return;
}
if (SQLType.UPDATE == sqlRecognizer.getSQLType()) {
if (beforeImage.getRows().size() != afterImage.getRows().size()) {
throw new ShouldNeverHappenException("Before image size is not equaled to after image size, probably because you updated the primary keys.");
}
}
ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
TableRecords lockKeyRecords = sqlRecognizer.getSQLType() == SQLType.DELETE ? beforeImage : afterImage;
String lockKeys = buildLockKey(lockKeyRecords);
if (null != lockKeys) {
connectionProxy.appendLockKey(lockKeys);
SQLUndoLog sqlUndoLog = buildUndoItem(beforeImage, afterImage);
connectionProxy.appendUndoLog(sqlUndoLog);
}
}
将lockKeys,和undoLog,暂时记录在connectionProxy中,也就是说至此还没有将uodoLog记录到数据库,也没有判断全局锁,这些事情都留到了事务提交
io.seata.rm.datasource.ConnectionProxy#doCommit
private void doCommit() throws SQLException {
if (context.inGlobalTransaction()) {
processGlobalTransactionCommit();
} else if (context.isGlobalLockRequire()) {
processLocalCommitWithGlobalLocks();
} else {
targetConnection.commit();
}
}
进入io.seata.rm.datasource.ConnectionProxy#processLocalCommitWithGlobalLocks
这个 方法很简单就是首先进行锁的检查,并没有我想象中的加索全局事务。
private void processLocalCommitWithGlobalLocks() throws SQLException {
checkLock(context.buildLockKeys());
try {
targetConnection.commit();
} catch (Throwable ex) {
throw new SQLException(ex);
}
context.reset();
}
也就是说,使用GlobalLock会对全局锁检测,但是并不会对记录加全局锁。但是配合全局事务这样已经能够保证全局事务的原子性了。可见GlobalLock还是要和本地事务组合一起使用的,这样才能保证,GlobalLock执行完毕本地事务未提交的数据不会被别的本地事务/分布式事务修改掉。
来源:https://blog.csdn.net/qq_37436172/article/details/126689639


猜你喜欢
- 基本概念Semaphore也是一个线程同步的辅助类,可以维护当前访问自身的线程个数,并提供了同步机制。使用Semaphore可以控制并发访问
- 一、说明 添加视图文件的时候有两种方式:1、通过在xml文件定义layout;2、java代码编写二、前言说明1.构造xml文件2.Layo
- mybatis初始化SqlSessionFactory失败总结原因有几点1.resources中的xml配置文件放错位置或者是放的太深加载不
- 如下所示://定义二维数组写法1 class numthree{public static void main(String[] args)
- 微服务通过feign.RequestInterceptor传递参数Feign 支持请求 * ,在发送请求前,可以对发送的模板进行操作,例如设
- 本文实例为大家分享了java实现查找替换功能的具体代码,供大家参考,具体内容如下查找if(searchTxt.getText().equal
- 最近要实现一个功能,就是checkbox跨页多选,在网上看了一下,资料很少,而且大多是不完全的。不过经过我的努力,终于做出来了。
- 使用JAVA工程管理越来越多的jar包,担心导错了,多导了,漏导了怎么办?换一个IDE项目后项目会不会出一堆BUG,看的头皮发麻?自己写的代
- 本文实例汇总了C#路径,文件,目录及IO常见操作。分享给大家供大家参考。具体如下:问题1:如何判定一个给定的路径是否有效/合法;通过Path
- 看下效果图主要看下自定义view 代码public class ProcessImageView extends ImageView{ &
- 在项目中遇到需要批量更新的功能,原本想的是在Java中用循环访问数据库去更新,但是心里总觉得这样做会不会太频繁了,太耗费资源了,效率也很低,
- Java原生SPI面向接口编程+策略模式实现建立接口Robotpublic interface Robot { /
- 前言MyBatis常用标签及标签使用技巧MyBatis的常用标签有很多,比如<sql id="">:预定义可
- 前言这是用testng框架加selenium做的一个UI自动化测试的项目Java代码package com.justin;/**?* @au
- step1:先移除centos自带的jdkrpm -qa|grep javarpm -e --nodeps xxstep2:安装jdk (所
- 本文介绍了Android EasyBarrage实现轻量级弹幕效果,分享给大家,具体如下:概述EasyBarrage是Android平台的一
- 在Java中对集合进行操作时,有时候需要对类中的equals() 和 hashCode()进行方法重写.IDEA中实现了利用快捷键即可对上述
- 示例 1 :使用搜索表单创建全屏模式我们要构建的小应用程序有一个应用程序栏,右侧有一个搜索按钮。按下此按钮时,将出现一个全屏模式对话框。它不
- 1. 初始 Spring Boot1.1 什么是Spring BootSpring 的诞生是为了简化 Java 程序的开发的Spring B
- 今天学习了Spinner组件的使用,非常好用的一款组件,相当于从下拉列表中选择项目,今天收获颇多,下面给大家演示一下Spinner的使用(分