spring详细事务源码解析——事务怎么挂起的?传播等级怎么实现的?保存点、内嵌事务怎么做回滚和提交?
零、引言
spring 事务怎么挂起的?传播等级怎么实现的?保存点、内嵌事务怎么做回滚和提交?如果你对这些感兴趣,或者想要了解它的运行机制,则可以参考此文章 ~
(打个广告,深圳或者其他城市也行,找工作 = =!有坑位请私信我 )
一、spring 事务大局纵览
上篇文章主要提到了 spring 如何去管理 mybatis 的事务,实际上就是借助 TransactionSynchronizationManager
来保存线程级别的连接对象,sqlSession对象,但我们没有过多提及 spring 本身的实现如何,我们先回顾一下几个主要的点。
- 在创建 MapperProxy 时,spring 为其注入了一个 sqlSession 用于 sql执行,但是这个 sqlSession 是一个代理对象,叫做 sqlSessionTemplate,它会自动选择我们该使用哪个 sqlSession 去执行
- 在执行时,spring 切面在执行事务之前,会创建一个叫做 TransactionInfo 的对象,此对象会根据事务传播等级来控制是否创建新连接,是否挂起上一个连接,将信息保存在 TransactionSynchronizationManager
- 到了真正需要创建或者获取 sqlSession 时,spring 重写的 TransactionFactory 会优先去 TransactionSynchronizationManager 中拿连接对象。
上篇文章我们也零零碎碎讲到了 spring 事务管理的几个重要的接口,我们先从 TransactionInfo
这个类,它就是事务执行的上下文,TransactionInfo
的重要程度不言而喻。它包含了这么几个对象:
PlatformTransactionManager
事务进行几乎所有进行时操作的核心逻辑就在这里面,它有几个子类,我们主要还是讨论DataSourceTransactionManager
这个子类,也就是对 JDBC 连接的管理。TransactionDefition
事务在开启之前的一些配置信息,比如配置传播等级,隔离级别,超时控制等等。TransactionStatus
事务的运行时信息基本都在这里面,比如连接信息,挂起的事务的信息(保存点),事务跑没跑完等等。TransactionInfo - old
,每个 info 会包含它上个挂起事务的引用,可能为空。- Joinpointxxx 这个无所谓,debug 用的,包含了事务切面的信息的一个字符串。
还是从代码入手:也就是事务的执行纵览:
-- 代码位于 org.springframework.transaction.interceptor.TransactionAspectSupport#invokeWithinTransaction --@Nullable
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
final InvocationCallback invocation) throws Throwable {
// If the transaction attribute is null, the method is non-transactional.
TransactionAttributeSource tas = getTransactionAttributeSource();
final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
final TransactionManager tm = determineTransactionManager(txAttr);
PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
// Standard transaction demarcation with getTransaction and commit/rollback calls.
TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification); // 创建或者拿到当前事务信息
Object retVal;
try {
// This is an around advice: Invoke the next interceptor in the chain.
// This will normally result in a target object being invoked.
retVal = invocation.proceedWithInvocation();// 执行切面
}
catch (Throwable ex) {
// target invocation exception
completeTransactionAfterThrowing(txInfo, ex);// 回滚逻辑
throw ex;
}
finally {
cleanupTransactionInfo(txInfo);// 拿回上一个事务
}
commitTransactionAfterReturning(txInfo);// 提交当前事务
return retVal;
}
}
上述代码逻辑还是很清晰的
- 先从
TransactionDefinition(TransactionAttribute)
开始,看看当前这个切面需不需要执行事务, - 如果能获取到,则通过其获取到合适的
PlatformTransactionManager
,创建或者获取、封装到我们的TransactionInfo
。 - 执行切面,再根据切面情况选择提交或者回滚。
我们知道事务最终都会创建一个 TransactionInfo
,这个对象创建的最后一步挺有意思:
-- 代码位于 org.springframework.transaction.interceptor.TransactionAspectSupport#prepareTransactionInfo ---- 是方法 createTransactionIfNecessary 的最后一步 --
protected TransactionInfo prepareTransactionInfo(@Nullable PlatformTransactionManager tm,
@Nullable TransactionAttribute txAttr, String joinpointIdentification,
@Nullable TransactionStatus status) {
TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification);
// We always bind the TransactionInfo to the thread, even if we didn"t create
// a new transaction here. This guarantees that the TransactionInfo stack
// will be managed correctly even if no transaction was created by this aspect.
txInfo.bindToThread();
return txInfo;
}
private static final ThreadLocal<TransactionInfo> transactionInfoHolder =
new NamedThreadLocal<>("Current aspect-driven transaction");
private void bindToThread() {
// Expose current TransactionStatus, preserving any existing TransactionStatus
// for restoration after this transaction is complete.
this.oldTransactionInfo = transactionInfoHolder.get();
transactionInfoHolder.set(this);
}
我们发现它这里有一个 ThreadLocal
,它会将之前已经保存在这里的 TransactionInfo
拿出来,放到刚才上面提到的 old
里面持有,而当前这个 TransactionInfo
对象则会扔进这个 ThreadLocal
里面,然后在执行完切面后把 old
放回去,形成了一个事务栈:
protected void cleanupTransactionInfo(@Nullable TransactionInfo txInfo) { if (txInfo != null) {
txInfo.restoreThreadLocalStatus();
}
}
private void restoreThreadLocalStatus() {
// Use stack to restore old transaction TransactionInfo.
// Will be null if none was set.
transactionInfoHolder.set(this.oldTransactionInfo);
}
二、TransactionStatus 源码解析
TransactionDefinition
基于配置,或者我们注解获取到的,其中又定义了该使用哪个 PlatformTransactionManager
,这些相对来说都比较简单。而 TransactionStatus
事务运行时这个最重要的对象的创建与使用却是比较复杂的。
它的创建大体可以分为如下流程:
2.1、创建
它的创建是比较简单的,没有用工厂,直接创建即可。
public DefaultTransactionStatus( @Nullable Object transaction, boolean newTransaction, boolean newSynchronization,
boolean readOnly, boolean debug, @Nullable Object suspendedResources) {
this.transaction = transaction;
this.newTransaction = newTransaction;
this.newSynchronization = newSynchronization;
this.readOnly = readOnly;
this.debug = debug;
this.suspendedResources = suspendedResources;
}
transaction 是里面一个比较重要的对象,它内置了连接对象,下面是 Transaction
对象的获取过程:
DataSourceTransactionObject txObject = new DataSourceTransactionObject(); txObject.setSavepointAllowed(isNestedTransactionAllowed());
ConnectionHolder conHolder =
(ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
txObject.setConnectionHolder(conHolder, false);
return txObject;
我们可以看到核心的就是 ConnectionHolder
,这个对象尤为重要,注意,这里并没有建立连接,而且也可能获取到空的 connectionHolder
,连接的包裹对象 ConnectionHolder
可以控制两个事物到底是不是用的同一个连接(同一个真正的事务,一起rollback 一起commit 的那种)。
此对象存放在一个叫做 TransactionSynchronizationManager
的工具单例类里面,它内部持有非常多个 ThreadLocal
来存放事务信息。
2.2、连接初始化
上面说了没有真正建立连接,建立连接是在 doBegin()
的方法里做的:
/** * This implementation sets the isolation level but ignores the timeout.
*/
@Override
protected void doBegin(Object transaction, TransactionDefinition definition) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
Connection con = null;
// 第一步,根据是否受事务管理,或者有没有 connectionHolder 对象去创建新连接,是否获取一个新连接
if (!txObject.hasConnectionHolder() ||
txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
Connection newCon = obtainDataSource().getConnection();
if (logger.isDebugEnabled()) {
logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
}
txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
}
// 设置此事务已经受事务管理
txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
con = txObject.getConnectionHolder().getConnection();
// 设置一些 `TransactionDefinition` 过来的属性,比如隔离等级,传播等级,是否只读等等,并将其设置为可用。
Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
txObject.setPreviousIsolationLevel(previousIsolationLevel);
txObject.setReadOnly(definition.isReadOnly());
// Switch to manual commit if necessary. This is very expensive in some JDBC drivers,
// so we don"t want to do it unnecessarily (for example if we"ve explicitly
// configured the connection pool to set it already).
if (con.getAutoCommit()) {
txObject.setMustRestoreAutoCommit(true);
if (logger.isDebugEnabled()) {
logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
}
con.setAutoCommit(false);
}
prepareTransactionalConnection(con, definition);
txObject.getConnectionHolder().setTransactionActive(true);
int timeout = determineTimeout(definition);
if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
}
//--------------------------------------------------------------------------------------
// Bind the connection holder to the thread.
if (txObject.isNewConnectionHolder()) {
TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
}
}
}
代码很长但是很简单,从上到下依次为:
- 如果没有
TransactionHolder
或者不属于其他事务管理,则创建一个新连接obtainDataSource().getConnection();
,并且把新创建的连接放入connectionHoler
。 - 将其设置为已经受事务同步管理
setSynchronizedWithTransaction(true)
- 设置一些
TransactionDefinition
过来的属性,比如隔离等级,传播等级,是否只读等等,并将其设置为可用。 - 把它塞入或者塞回
TransactionSynchronizationManager
的ThreadLocal
里面,也就是将connectionHolder
与当前线程绑定。
2.3、对 TransactionStatus
进行配置
方法如下,非常简单,但是十分重要:
protected void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition) { if (status.isNewSynchronization()) {
TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction());
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(
definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT ?
definition.getIsolationLevel() : null);
TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly());
TransactionSynchronizationManager.setCurrentTransactionName(definition.getName());
TransactionSynchronizationManager.initSynchronization();
}
}
我们发现又是 TransactionSynchronizationManager
,这个事务信息保存对象不仅仅保存 connectionHolder
,还保存它是否是一个事务,setActualTransactionActive
,比如我们有些传播等级是有事务则使用事务,没有就不适用,spring 还是会给它准备一个 TransactionInfo
,但是里面的实现基本上是空的。
后面是设置一些传播等级,名字,是否只读之类的属性,最后在 initSynchronization()
方法里面,创建了一个叫做 synchronizations
的 linkedHashSet<TransactionSynchronization>
,划重点,这个东西类似一个监听器,在触发事务的某些行为时会被调用,比如被挂起,被重新使用等等:
/** * Suspend this synchronization.
* Supposed to unbind resources from TransactionSynchronizationManager if managing any.
* @see TransactionSynchronizationManager#unbindResource
*/
default void suspend() {
}
/**
* Resume this synchronization.
* Supposed to rebind resources to TransactionSynchronizationManager if managing any.
* @see TransactionSynchronizationManager#bindResource
*/
default void resume() {
}
/**
* Flush the underlying session to the datastore, if applicable:
* for example, a Hibernate/JPA session.
* @see org.springframework.transaction.TransactionStatus#flush()
*/
@Override
default void flush() {
}
/**
* Invoked before transaction commit (before "beforeCompletion").
* Can e.g. flush transactional O/R Mapping sessions to the database.
* <p>This callback does <i>not</i> mean that the transaction will actually be committed.
* A rollback decision can still occur after this method has been called. This callback
* is rather meant to perform work that"s only relevant if a commit still has a chance
* to happen, such as flushing SQL statements to the database.
* <p>Note that exceptions will get propagated to the commit caller and cause a
* rollback of the transaction.
* @param readOnly whether the transaction is defined as read-only transaction
* @throws RuntimeException in case of errors; will be <b>propagated to the caller</b>
* (note: do not throw TransactionException subclasses here!)
* @see #beforeCompletion
*/
default void beforeCommit(boolean readOnly) {
}
/**
* Invoked before transaction commit/rollback.
* Can perform resource cleanup <i>before</i> transaction completion.
* <p>This method will be invoked after {@code beforeCommit}, even when
* {@code beforeCommit} threw an exception. This callback allows for
* closing resources before transaction completion, for any outcome.
* @throws RuntimeException in case of errors; will be <b>logged but not propagated</b>
* (note: do not throw TransactionException subclasses here!)
* @see #beforeCommit
* @see #afterCompletion
*/
default void beforeCompletion() {
}
三、TransactionStatus + TransactionSynchronizationManager 的使用以及事务传播等级
前面讲了 TransactionStatus
是怎么创建的,创建后发生了什么,创建后怎么复用连接,何时复用连接,实际上就是事务传播等级,我们一个一个看:
-- 代码位于 org.springframework.transaction.support.AbstractPlatformTransactionManager#getTransaction -- Object transaction = doGetTransaction();
boolean debugEnabled = logger.isDebugEnabled();
if (isExistingTransaction(transaction)) {
// Existing transaction found -> check propagation behavior to find out how to behave.
return handleExistingTransaction(def, transaction, debugEnabled);
}
我们刚才讲的是,获取不到 ConnectionHolder
的流程,那么如果获取到了,就是我们这里要讲的,传播等级。
3.1 PROPAGATION_NEVER
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) { throw new IllegalTransactionStateException(
"Existing transaction found for transaction marked with propagation "never"");
}
既然是 never,那就不可能有 connectionHolder
,所以这里直接抛出异常。
3.2 PROPAGATION_NOT_SUPPORTED
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) { if (debugEnabled) {
logger.debug("Suspending current transaction");
}
Object suspendedResources = suspend(transaction);
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(
definition, null, false, newSynchronization, debugEnabled, suspendedResources);
}
可以看到第一步,suspend(trx)
挂了了一个事务,并且在创建 TransactionStatus
时,没有放入 transaction
对象,也就是连接对象。
3.3 PROPAGATION_REQUIRES_NEW
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) { if (debugEnabled) {
logger.debug("Suspending current transaction, creating new transaction with name [" +
definition.getName() + "]");
}
SuspendedResourcesHolder suspendedResources = suspend(transaction);
try {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
}
catch (RuntimeException | Error beginEx) {
resumeAfterBeginException(transaction, suspendedResources, beginEx);
throw beginEx;
}
}
这里很纯粹,我们知道 PROPAGATION_REQUIRES_NEW 就是不管怎么样,都创建新的事务,使用新的连接,所以这里直接挂起事务,并且 newTransactionStatus
+ doBegin
+prepareSynchronization
三部曲,就是我们上个章节讲的 TransactionStatus
的初始化。
3.3 PROPAGATION_NESTED 与 SAVEPOINT保存点
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { if (!isNestedTransactionAllowed()) {
throw new NestedTransactionNotSupportedException(
"Transaction manager does not allow nested transactions by default - " +
"specify "nestedTransactionAllowed" property with value "true"");
}
if (debugEnabled) {
logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
}
if (useSavepointForNestedTransaction()) {
// Create savepoint within existing Spring-managed transaction,
// through the SavepointManager API implemented by TransactionStatus.
// Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.
DefaultTransactionStatus status =
prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
status.createAndHoldSavepoint();
return status;
}
else {
// Nested transaction through nested begin and commit/rollback calls.
// Usually only for JTA: Spring synchronization might get activated here
// in case of a pre-existing JTA transaction.
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, null);
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
}
}
默认的传播级别,也就是有事务就用这个事务,没有就新建一个。注意这里有一个叫做 savePoint
保存点的东西:savePoint
有个计数器,保存在 ConnectionHolder
里面,如果需要创建保存点,就会通过它操作 Connection
去启用保存点,如果我们报了错,可以回滚到此保存点,比如这一小段代码 SAVEPOINT_1,里面有一个内嵌事务, SAVAPOINT_2,执行这个内嵌出错,我们可以回滚 SAVAPOINT_2,而 SAVEPOINT_1 不受影响。
没有使用 savePoint
则还是三部曲,但是这个三部曲,由于我们已经有一个现有的连接,所以会创建一个 TransactionStatus
,但是他们的连接也就是 connectionHolder
使用的是同一个对象。
它的调用过程也是简单,我们的 PlatformTransactionManager
有一个叫做 rollback 的接口,它的实现有这么一段代码:
if (status.hasSavepoint()) { if (status.isDebug()) {
logger.debug("Rolling back transaction to savepoint");
}
status.rollbackToHeldSavepoint();
}
else if (status.isNewTransaction()) {
if (status.isDebug()) {
logger.debug("Initiating transaction rollback");
}
doRollback(status);
}
3.4 suspend 和 resume?怎么实现的?
上面那些东西实际上都比较简单,就是判断是否获取新的连接,然后创建一个 TransactionInfo
对象,但是里面有一个稍微复杂点的东西,就是事务的挂起和恢复,是怎么做的?
我们先看挂起:
3.4.1 suspend 挂起事务
protected final SuspendedResourcesHolder suspend(@Nullable Object transaction) throws TransactionException { if (TransactionSynchronizationManager.isSynchronizationActive()) {
List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();
try {
Object suspendedResources = null;
if (transaction != null) {
suspendedResources = doSuspend(transaction);
}
String name = TransactionSynchronizationManager.getCurrentTransactionName();
TransactionSynchronizationManager.setCurrentTransactionName(null);
boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);
boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
TransactionSynchronizationManager.setActualTransactionActive(false);
return new SuspendedResourcesHolder(
suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);
}
catch (RuntimeException | Error ex) {
// doSuspend failed - original transaction is still active...
doResumeSynchronization(suspendedSynchronizations);
throw ex;
}
}
else if (transaction != null) {
// Transaction active but no synchronization active.
Object suspendedResources = doSuspend(transaction);
return new SuspendedResourcesHolder(suspendedResources);
}
else {
// Neither transaction nor synchronization active.
return null;
}
}
当一个事务被挂起,且处于同步状态(前面说的,调用过 prepare 最后一步 initSynchronization
,搞了一个 linkedHashSet<TransactionSynchronization>
),还划了重点的那个。
- 此时会先
doSuspendSynchronization();
,里面很简单,就是把我们注册的那些TransactionSynchronization
的suspend()
方法都跑一遍,并且把它们全部清除避免被跑两次以上,并且拿到这部分注册的linkedHashSet<TransactionSynchronization>
:suspendedSynchronizations
如下:
private List<TransactionSynchronization> doSuspendSynchronization() { List<TransactionSynchronization> suspendedSynchronizations =
TransactionSynchronizationManager.getSynchronizations();
for (TransactionSynchronization synchronization : suspendedSynchronizations) {
synchronization.suspend();
}
TransactionSynchronizationManager.clearSynchronization();
return suspendedSynchronizations;
}
紧接着调用 PlatfromTransactionManager
的 doSuspend()
,代码也是很简单,就是把刚才来来回回讲的那个 transactionHolder
移除掉,返回的是我们的事务对象,也就是包含了 connection
的那个。
protected Object doSuspend(Object transaction) { DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
txObject.setConnectionHolder(null);
return TransactionSynchronizationManager.unbindResource(obtainDataSource());
}
后续的操作也很容易看懂,我们把它保存在 TransactionSynchronizationManager
的那些 ThreadLocal
全部拿到原值,并把现有的值清除掉,并且我们把上面说的那些乱七八糟的包括 suspendedSynchronizations
,以及 transaction
一并放到一个叫做 SuspendedResourcesHolder
的对象里面。
这样,这个事务就和当前线程没有半毛钱关系了,这些变量全部被保存在 SuspendedResourcesHolder
里面,这个保存着事务信息的对象会保存在我们新的 TransactionStatus
里面,那么问题来了,怎么恢复?实际上你都知道怎么挂起了,还不知道怎么恢复吗?
3.4.2 resume 恢复事务
当然是反过来操作!
resume()
的调用入口如下:
AbstractPlatformTransactionManagergetTransaction(TransactionDefinition)
resumeAfterBeginException(Object, SuspendedResourcesHolder, Throwable)
cleanupAfterCompletion(DefaultTransactionStatus)
很简单,分两种,第一种是当前事务报错了会恢复到上一个事务。第二种是当前事务执行完毕了,会调用恢复。代码很简单,如下:我就不啰嗦了,是上面的反操作。
-- 代码位于 org.springframework.transaction.support.AbstractPlatformTransactionManager#resume -- protected final void resume(@Nullable Object transaction, @Nullable SuspendedResourcesHolder resourcesHolder)
throws TransactionException {
if (resourcesHolder != null) {
Object suspendedResources = resourcesHolder.suspendedResources;
if (suspendedResources != null) {
doResume(transaction, suspendedResources);
}
List<TransactionSynchronization> suspendedSynchronizations = resourcesHolder.suspendedSynchronizations;
if (suspendedSynchronizations != null) {
TransactionSynchronizationManager.setActualTransactionActive(resourcesHolder.wasActive);
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(resourcesHolder.isolationLevel);
TransactionSynchronizationManager.setCurrentTransactionReadOnly(resourcesHolder.readOnly);
TransactionSynchronizationManager.setCurrentTransactionName(resourcesHolder.name);
doResumeSynchronization(suspendedSynchronizations);
}
}
}
3.5 TransactionStatus 总结
我们再回顾以及总结一下 TransactionStatus
这个 spring 事务运行时的重要对象,它的重要的方法以及成员变量如下所示,我们的内嵌事务,传播等级,事务是不是同一个事物,无非就是来操作这几个方法:
四、剩余流程(提交与回滚)解析
我们已经把最重要对象 TransactionStatus
的源码解析的七七八八了,剩下的事务控制实际上已经很简单了,获取到 TransactionInfo
(主要是内部那个 TransactionStatus
以及对 TransactionSychronizationManager
的操作):
后续的操作我们看到,就是三部分:报错做什么、最终做什么、怎么提交。
if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
// Standard transaction demarcation with getTransaction and commit/rollback calls.
TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification); // 创建或者拿到当前事务信息
Object retVal;
try {
// This is an around advice: Invoke the next interceptor in the chain.
// This will normally result in a target object being invoked.
retVal = invocation.proceedWithInvocation();// 执行切面
}
catch (Throwable ex) {
// target invocation exception
completeTransactionAfterThrowing(txInfo, ex);// 回滚逻辑
throw ex;
}
finally {
cleanupTransactionInfo(txInfo);// 拿回上一个事务
}
commitTransactionAfterReturning(txInfo);// 提交当前事务
return retVal;
}
cleanupTransactionInfo()
我们已经讲过,就是 TransactionInfo
里有一个上一个事务 TransactionInfo
的引用,只是把它找回来,没什么太多的东西。
4.1 事务怎么做回滚
回滚的核心方法在这里:
if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) { try {
txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
}
catch (TransactionSystemException ex2) {
logger.error("Application exception overridden by rollback exception", ex);
ex2.initApplicationException(ex);
throw ex2;
}
catch (RuntimeException | Error ex2) {
logger.error("Application exception overridden by rollback exception", ex);
throw ex2;
}
}
我们可以看到,回滚的时候,会先保证处于我们标识的异常里,默认是所有的 Error
和 RuntimeException
,注意这点哦,如果你对非 RuntimeException
进行了抛出,是不会回滚的!或者你重写了 Throwable
,也不会!
回滚的方法比较简单,但是又几个要注意的点:
triggerBeforeCompletion(status); if (status.hasSavepoint()) {
if (status.isDebug()) {
logger.debug("Rolling back transaction to savepoint");
}
status.rollbackToHeldSavepoint();
}
else if (status.isNewTransaction()) {
if (status.isDebug()) {
logger.debug("Initiating transaction rollback");
}
doRollback(status);
}
else {
// Participating in larger transaction
if (status.hasTransaction()) {
if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
if (status.isDebug()) {
logger.debug("Participating transaction failed - marking existing transaction as rollback-only");
}
doSetRollbackOnly(status);
}
else {
if (status.isDebug()) {
logger.debug("Participating transaction failed - letting transaction originator decide on rollback");
}
}
}
else {
logger.debug("Should roll back transaction but cannot - no transaction available");
}
// Unexpected rollback only matters here if we"re asked to fail early
if (!isFailEarlyOnGlobalRollbackOnly()) {
unexpectedRollback = false;
}
}
}
catch (RuntimeException | Error ex) {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
throw ex;
}
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
我们可以看到有很多的触发(trigger),这些触发实际上都是我们刚才讲的那个 Set<TransactionSynchronization>
这里就不多说了。
这里的回滚分为几种情况:
- 有保存点的,刚才讲过了,看漏的回去翻翻
3.3 PROPAGATION_NESTED 与 SAVEPOINT保存点 - 3.4.2 resume 恢复事务
- 崭新的事务!这是最简单的,就是调用 connection.rollback()
protected void doRollback(DefaultTransactionStatus status) { DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
Connection con = txObject.getConnectionHolder().getConnection();
if (status.isDebug()) {
logger.debug("Rolling back JDBC transaction on Connection [" + con + "]");
}
try {
con.rollback();
}
}
- 如果有事务,则通过
TransactionStatus
将ConnectionHolder
的变量rollbackOnly
设置为真
是的,到这里你发现除了保存点的回滚,这里没有做任何实质的回滚操作!它只是设置了一下 rollbackOnly
,然后抛出异常,不过这个异常比如 mybatis
会捕获,并调用 sqlSession.rollback()
真正做回滚,不过这是 mybatis 的操作了。
4.2 事务怎么做提交
提交也和回滚一样的道理,入口如下:
protected void commitTransactionAfterReturning(@Nullable TransactionInfo txInfo) { if (txInfo != null && txInfo.getTransactionStatus() != null) {
if (logger.isTraceEnabled()) {
logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "]");
}
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
}
}
它的实现如下:
public final void commit(TransactionStatus status) throws TransactionException { if (status.isCompleted()) {
throw new IllegalTransactionStateException(
"Transaction is already completed - do not call commit or rollback more than once per transaction");
}
DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
if (defStatus.isLocalRollbackOnly()) {
if (defStatus.isDebug()) {
logger.debug("Transactional code has requested rollback");
}
processRollback(defStatus, false);
return;
}
if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
if (defStatus.isDebug()) {
logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");
}
processRollback(defStatus, true);
return;
}
processCommit(defStatus);
}
在这里,它会判断有没有其他地方给 ConnectionHolder
设置了 rollbackOnly
,如果设置了,也会进行回滚操作,和 4.1 一样,最终抛出异常:
如果没有则正常调用 processCommit()
,,这里不细讲,因为和 processRollback()
没什么太大区别,就是几个触发(trigger),然后如果有保存点,就释放一下保存点,最终来到 PlatformTransactionManager#docommit()
。
值得注意的是,只有崭新的事务( newTransaction = true
)才会调用 docommit()
,内嵌事务就不是崭新的事务,怎么判断崭新事务,就是那个创建了 ConnectionHolder
,开启了连接的那一个事务,凡是拿了已有的 ConnectionHolder
进行操作的事务都不是崭新的事务:
protected void doCommit(DefaultTransactionStatus status) { DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
Connection con = txObject.getConnectionHolder().getConnection();
if (status.isDebug()) {
logger.debug("Committing JDBC transaction on Connection [" + con + "]");
}
try {
con.commit();
}
}
代码一样很简单,这里就不啰嗦了。
参考源码:spring-boot 2.2.2,即 spring 5.2.2
以上是 spring详细事务源码解析——事务怎么挂起的?传播等级怎么实现的?保存点、内嵌事务怎么做回滚和提交? 的全部内容, 来源链接: utcz.com/z/512563.html