spring详细事务源码解析——事务怎么挂起的?传播等级怎么实现的?保存点、内嵌事务怎么做回滚和提交?

编程

零、引言

spring 事务怎么挂起的?传播等级怎么实现的?保存点、内嵌事务怎么做回滚和提交?如果你对这些感兴趣,或者想要了解它的运行机制,则可以参考此文章 ~

(打个广告,深圳或者其他城市也行,找工作 = =!有坑位请私信我 )

一、spring 事务大局纵览

上篇文章主要提到了 spring 如何去管理 mybatis 的事务,实际上就是借助 TransactionSynchronizationManager 来保存线程级别的连接对象,sqlSession对象,但我们没有过多提及 spring 本身的实现如何,我们先回顾一下几个主要的点。

  • 在创建 MapperProxy 时,spring 为其注入了一个 sqlSession 用于 sql执行,但是这个 sqlSession 是一个代理对象,叫做 sqlSessionTemplate,它会自动选择我们该使用哪个 sqlSession 去执行
  • 在执行时,spring 切面在执行事务之前,会创建一个叫做 TransactionInfo 的对象,此对象会根据事务传播等级来控制是否创建新连接,是否挂起上一个连接,将信息保存在 TransactionSynchronizationManager
  • 到了真正需要创建或者获取 sqlSession 时,spring 重写的 TransactionFactory 会优先去 TransactionSynchronizationManager 中拿连接对象。

上篇文章我们也零零碎碎讲到了 spring 事务管理的几个重要的接口,我们先从 TransactionInfo 这个类,它就是事务执行的上下文,TransactionInfo 的重要程度不言而喻。它包含了这么几个对象:

  • PlatformTransactionManager 事务进行几乎所有进行时操作的核心逻辑就在这里面,它有几个子类,我们主要还是讨论 DataSourceTransactionManager 这个子类,也就是对 JDBC 连接的管理。
  • TransactionDefition 事务在开启之前的一些配置信息,比如配置传播等级,隔离级别,超时控制等等。
  • TransactionStatus 事务的运行时信息基本都在这里面,比如连接信息,挂起的事务的信息(保存点),事务跑没跑完等等。
  • TransactionInfo - old ,每个 info 会包含它上个挂起事务的引用,可能为空。
  • Joinpointxxx 这个无所谓,debug 用的,包含了事务切面的信息的一个字符串。

还是从代码入手:也就是事务的执行纵览:

-- 代码位于 org.springframework.transaction.interceptor.TransactionAspectSupport#invokeWithinTransaction --

@Nullable

protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,

final InvocationCallback invocation) throws Throwable {

// If the transaction attribute is null, the method is non-transactional.

TransactionAttributeSource tas = getTransactionAttributeSource();

final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);

final TransactionManager tm = determineTransactionManager(txAttr);

PlatformTransactionManager ptm = asPlatformTransactionManager(tm);

final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);

if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {

// Standard transaction demarcation with getTransaction and commit/rollback calls.

TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification); // 创建或者拿到当前事务信息

Object retVal;

try {

// This is an around advice: Invoke the next interceptor in the chain.

// This will normally result in a target object being invoked.

retVal = invocation.proceedWithInvocation();// 执行切面

}

catch (Throwable ex) {

// target invocation exception

completeTransactionAfterThrowing(txInfo, ex);// 回滚逻辑

throw ex;

}

finally {

cleanupTransactionInfo(txInfo);// 拿回上一个事务

}

commitTransactionAfterReturning(txInfo);// 提交当前事务

return retVal;

}

}

上述代码逻辑还是很清晰的

  • 先从 TransactionDefinition(TransactionAttribute) 开始,看看当前这个切面需不需要执行事务,
  • 如果能获取到,则通过其获取到合适的 PlatformTransactionManager,创建或者获取、封装到我们的 TransactionInfo
  • 执行切面,再根据切面情况选择提交或者回滚。

我们知道事务最终都会创建一个 TransactionInfo,这个对象创建的最后一步挺有意思:

-- 代码位于 org.springframework.transaction.interceptor.TransactionAspectSupport#prepareTransactionInfo --

-- 是方法 createTransactionIfNecessary 的最后一步 --

protected TransactionInfo prepareTransactionInfo(@Nullable PlatformTransactionManager tm,

@Nullable TransactionAttribute txAttr, String joinpointIdentification,

@Nullable TransactionStatus status) {

TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification);

// We always bind the TransactionInfo to the thread, even if we didn"t create

// a new transaction here. This guarantees that the TransactionInfo stack

// will be managed correctly even if no transaction was created by this aspect.

txInfo.bindToThread();

return txInfo;

}

private static final ThreadLocal<TransactionInfo> transactionInfoHolder =

new NamedThreadLocal<>("Current aspect-driven transaction");

private void bindToThread() {

// Expose current TransactionStatus, preserving any existing TransactionStatus

// for restoration after this transaction is complete.

this.oldTransactionInfo = transactionInfoHolder.get();

transactionInfoHolder.set(this);

}

我们发现它这里有一个 ThreadLocal,它会将之前已经保存在这里的 TransactionInfo 拿出来,放到刚才上面提到的 old 里面持有,而当前这个 TransactionInfo 对象则会扔进这个 ThreadLocal 里面,然后在执行完切面后把 old 放回去,形成了一个事务栈:

	protected void cleanupTransactionInfo(@Nullable TransactionInfo txInfo) {

if (txInfo != null) {

txInfo.restoreThreadLocalStatus();

}

}

private void restoreThreadLocalStatus() {

// Use stack to restore old transaction TransactionInfo.

// Will be null if none was set.

transactionInfoHolder.set(this.oldTransactionInfo);

}

二、TransactionStatus 源码解析

TransactionDefinition 基于配置,或者我们注解获取到的,其中又定义了该使用哪个 PlatformTransactionManager,这些相对来说都比较简单。而 TransactionStatus 事务运行时这个最重要的对象的创建与使用却是比较复杂的。

它的创建大体可以分为如下流程:

2.1、创建

它的创建是比较简单的,没有用工厂,直接创建即可。

	public DefaultTransactionStatus(

@Nullable Object transaction, boolean newTransaction, boolean newSynchronization,

boolean readOnly, boolean debug, @Nullable Object suspendedResources) {

this.transaction = transaction;

this.newTransaction = newTransaction;

this.newSynchronization = newSynchronization;

this.readOnly = readOnly;

this.debug = debug;

this.suspendedResources = suspendedResources;

}

transaction 是里面一个比较重要的对象,它内置了连接对象,下面是 Transaction 对象的获取过程:

	DataSourceTransactionObject txObject = new DataSourceTransactionObject();

txObject.setSavepointAllowed(isNestedTransactionAllowed());

ConnectionHolder conHolder =

(ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());

txObject.setConnectionHolder(conHolder, false);

return txObject;

我们可以看到核心的就是 ConnectionHolder,这个对象尤为重要,注意,这里并没有建立连接,而且也可能获取到空的 connectionHolder,连接的包裹对象 ConnectionHolder 可以控制两个事物到底是不是用的同一个连接(同一个真正的事务,一起rollback 一起commit 的那种)。

此对象存放在一个叫做 TransactionSynchronizationManager 的工具单例类里面,它内部持有非常多个 ThreadLocal 来存放事务信息。

2.2、连接初始化

上面说了没有真正建立连接,建立连接是在 doBegin() 的方法里做的:

	/**

* This implementation sets the isolation level but ignores the timeout.

*/

@Override

protected void doBegin(Object transaction, TransactionDefinition definition) {

DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;

Connection con = null;

// 第一步,根据是否受事务管理,或者有没有 connectionHolder 对象去创建新连接,是否获取一个新连接

if (!txObject.hasConnectionHolder() ||

txObject.getConnectionHolder().isSynchronizedWithTransaction()) {

Connection newCon = obtainDataSource().getConnection();

if (logger.isDebugEnabled()) {

logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");

}

txObject.setConnectionHolder(new ConnectionHolder(newCon), true);

}

// 设置此事务已经受事务管理

txObject.getConnectionHolder().setSynchronizedWithTransaction(true);

con = txObject.getConnectionHolder().getConnection();

// 设置一些 `TransactionDefinition` 过来的属性,比如隔离等级,传播等级,是否只读等等,并将其设置为可用。

Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);

txObject.setPreviousIsolationLevel(previousIsolationLevel);

txObject.setReadOnly(definition.isReadOnly());

// Switch to manual commit if necessary. This is very expensive in some JDBC drivers,

// so we don"t want to do it unnecessarily (for example if we"ve explicitly

// configured the connection pool to set it already).

if (con.getAutoCommit()) {

txObject.setMustRestoreAutoCommit(true);

if (logger.isDebugEnabled()) {

logger.debug("Switching JDBC Connection [" + con + "] to manual commit");

}

con.setAutoCommit(false);

}

prepareTransactionalConnection(con, definition);

txObject.getConnectionHolder().setTransactionActive(true);

int timeout = determineTimeout(definition);

if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {

txObject.getConnectionHolder().setTimeoutInSeconds(timeout);

}

//--------------------------------------------------------------------------------------

// Bind the connection holder to the thread.

if (txObject.isNewConnectionHolder()) {

TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());

}

}

}

代码很长但是很简单,从上到下依次为:

  1. 如果没有 TransactionHolder 或者不属于其他事务管理,则创建一个新连接 obtainDataSource().getConnection();,并且把新创建的连接放入 connectionHoler
  2. 将其设置为已经受事务同步管理 setSynchronizedWithTransaction(true)
  3. 设置一些 TransactionDefinition 过来的属性,比如隔离等级,传播等级,是否只读等等,并将其设置为可用。
  4. 把它塞入或者塞回 TransactionSynchronizationManagerThreadLocal 里面,也就是将 connectionHolder 与当前线程绑定。

2.3、对 TransactionStatus 进行配置

方法如下,非常简单,但是十分重要:

	protected void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition) {

if (status.isNewSynchronization()) {

TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction());

TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(

definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT ?

definition.getIsolationLevel() : null);

TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly());

TransactionSynchronizationManager.setCurrentTransactionName(definition.getName());

TransactionSynchronizationManager.initSynchronization();

}

}

我们发现又是 TransactionSynchronizationManager,这个事务信息保存对象不仅仅保存 connectionHolder,还保存它是否是一个事务,setActualTransactionActive,比如我们有些传播等级是有事务则使用事务,没有就不适用,spring 还是会给它准备一个 TransactionInfo,但是里面的实现基本上是空的。

后面是设置一些传播等级,名字,是否只读之类的属性,最后在 initSynchronization() 方法里面,创建了一个叫做 synchronizationslinkedHashSet<TransactionSynchronization>划重点,这个东西类似一个监听器,在触发事务的某些行为时会被调用,比如被挂起,被重新使用等等:

	/**

* Suspend this synchronization.

* Supposed to unbind resources from TransactionSynchronizationManager if managing any.

* @see TransactionSynchronizationManager#unbindResource

*/

default void suspend() {

}

/**

* Resume this synchronization.

* Supposed to rebind resources to TransactionSynchronizationManager if managing any.

* @see TransactionSynchronizationManager#bindResource

*/

default void resume() {

}

/**

* Flush the underlying session to the datastore, if applicable:

* for example, a Hibernate/JPA session.

* @see org.springframework.transaction.TransactionStatus#flush()

*/

@Override

default void flush() {

}

/**

* Invoked before transaction commit (before "beforeCompletion").

* Can e.g. flush transactional O/R Mapping sessions to the database.

* <p>This callback does <i>not</i> mean that the transaction will actually be committed.

* A rollback decision can still occur after this method has been called. This callback

* is rather meant to perform work that"s only relevant if a commit still has a chance

* to happen, such as flushing SQL statements to the database.

* <p>Note that exceptions will get propagated to the commit caller and cause a

* rollback of the transaction.

* @param readOnly whether the transaction is defined as read-only transaction

* @throws RuntimeException in case of errors; will be <b>propagated to the caller</b>

* (note: do not throw TransactionException subclasses here!)

* @see #beforeCompletion

*/

default void beforeCommit(boolean readOnly) {

}

/**

* Invoked before transaction commit/rollback.

* Can perform resource cleanup <i>before</i> transaction completion.

* <p>This method will be invoked after {@code beforeCommit}, even when

* {@code beforeCommit} threw an exception. This callback allows for

* closing resources before transaction completion, for any outcome.

* @throws RuntimeException in case of errors; will be <b>logged but not propagated</b>

* (note: do not throw TransactionException subclasses here!)

* @see #beforeCommit

* @see #afterCompletion

*/

default void beforeCompletion() {

}

三、TransactionStatus + TransactionSynchronizationManager 的使用以及事务传播等级

前面讲了 TransactionStatus 是怎么创建的,创建后发生了什么,创建后怎么复用连接,何时复用连接,实际上就是事务传播等级,我们一个一个看:

-- 代码位于 org.springframework.transaction.support.AbstractPlatformTransactionManager#getTransaction --

Object transaction = doGetTransaction();

boolean debugEnabled = logger.isDebugEnabled();

if (isExistingTransaction(transaction)) {

// Existing transaction found -> check propagation behavior to find out how to behave.

return handleExistingTransaction(def, transaction, debugEnabled);

}

我们刚才讲的是,获取不到 ConnectionHolder 的流程,那么如果获取到了,就是我们这里要讲的,传播等级。

3.1 PROPAGATION_NEVER

		if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {

throw new IllegalTransactionStateException(

"Existing transaction found for transaction marked with propagation "never"");

}

既然是 never,那就不可能有 connectionHolder,所以这里直接抛出异常。

3.2 PROPAGATION_NOT_SUPPORTED

		if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {

if (debugEnabled) {

logger.debug("Suspending current transaction");

}

Object suspendedResources = suspend(transaction);

boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);

return prepareTransactionStatus(

definition, null, false, newSynchronization, debugEnabled, suspendedResources);

}

可以看到第一步,suspend(trx) 挂了了一个事务,并且在创建 TransactionStatus 时,没有放入 transaction 对象,也就是连接对象。

3.3 PROPAGATION_REQUIRES_NEW

		if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {

if (debugEnabled) {

logger.debug("Suspending current transaction, creating new transaction with name [" +

definition.getName() + "]");

}

SuspendedResourcesHolder suspendedResources = suspend(transaction);

try {

boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);

DefaultTransactionStatus status = newTransactionStatus(

definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);

doBegin(transaction, definition);

prepareSynchronization(status, definition);

return status;

}

catch (RuntimeException | Error beginEx) {

resumeAfterBeginException(transaction, suspendedResources, beginEx);

throw beginEx;

}

}

这里很纯粹,我们知道 PROPAGATION_REQUIRES_NEW 就是不管怎么样,都创建新的事务,使用新的连接,所以这里直接挂起事务,并且 newTransactionStatus + doBegin +prepareSynchronization 三部曲,就是我们上个章节讲的 TransactionStatus 的初始化。

3.3 PROPAGATION_NESTED 与 SAVEPOINT保存点

		if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {

if (!isNestedTransactionAllowed()) {

throw new NestedTransactionNotSupportedException(

"Transaction manager does not allow nested transactions by default - " +

"specify "nestedTransactionAllowed" property with value "true"");

}

if (debugEnabled) {

logger.debug("Creating nested transaction with name [" + definition.getName() + "]");

}

if (useSavepointForNestedTransaction()) {

// Create savepoint within existing Spring-managed transaction,

// through the SavepointManager API implemented by TransactionStatus.

// Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.

DefaultTransactionStatus status =

prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);

status.createAndHoldSavepoint();

return status;

}

else {

// Nested transaction through nested begin and commit/rollback calls.

// Usually only for JTA: Spring synchronization might get activated here

// in case of a pre-existing JTA transaction.

boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);

DefaultTransactionStatus status = newTransactionStatus(

definition, transaction, true, newSynchronization, debugEnabled, null);

doBegin(transaction, definition);

prepareSynchronization(status, definition);

return status;

}

}

默认的传播级别,也就是有事务就用这个事务,没有就新建一个。注意这里有一个叫做 savePoint 保存点的东西:savePoint 有个计数器,保存在 ConnectionHolder 里面,如果需要创建保存点,就会通过它操作 Connection 去启用保存点,如果我们报了错,可以回滚到此保存点,比如这一小段代码 SAVEPOINT_1,里面有一个内嵌事务, SAVAPOINT_2,执行这个内嵌出错,我们可以回滚 SAVAPOINT_2,而 SAVEPOINT_1 不受影响。

没有使用 savePoint 则还是三部曲,但是这个三部曲,由于我们已经有一个现有的连接,所以会创建一个 TransactionStatus,但是他们的连接也就是 connectionHolder 使用的是同一个对象。

它的调用过程也是简单,我们的 PlatformTransactionManager 有一个叫做 rollback 的接口,它的实现有这么一段代码:

	if (status.hasSavepoint()) {

if (status.isDebug()) {

logger.debug("Rolling back transaction to savepoint");

}

status.rollbackToHeldSavepoint();

}

else if (status.isNewTransaction()) {

if (status.isDebug()) {

logger.debug("Initiating transaction rollback");

}

doRollback(status);

}

3.4 suspend 和 resume?怎么实现的?

上面那些东西实际上都比较简单,就是判断是否获取新的连接,然后创建一个 TransactionInfo 对象,但是里面有一个稍微复杂点的东西,就是事务的挂起和恢复,是怎么做的?

我们先看挂起:

3.4.1 suspend 挂起事务

	protected final SuspendedResourcesHolder suspend(@Nullable Object transaction) throws TransactionException {

if (TransactionSynchronizationManager.isSynchronizationActive()) {

List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();

try {

Object suspendedResources = null;

if (transaction != null) {

suspendedResources = doSuspend(transaction);

}

String name = TransactionSynchronizationManager.getCurrentTransactionName();

TransactionSynchronizationManager.setCurrentTransactionName(null);

boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();

TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);

Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();

TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);

boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();

TransactionSynchronizationManager.setActualTransactionActive(false);

return new SuspendedResourcesHolder(

suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);

}

catch (RuntimeException | Error ex) {

// doSuspend failed - original transaction is still active...

doResumeSynchronization(suspendedSynchronizations);

throw ex;

}

}

else if (transaction != null) {

// Transaction active but no synchronization active.

Object suspendedResources = doSuspend(transaction);

return new SuspendedResourcesHolder(suspendedResources);

}

else {

// Neither transaction nor synchronization active.

return null;

}

}

当一个事务被挂起,且处于同步状态(前面说的,调用过 prepare 最后一步 initSynchronization,搞了一个 linkedHashSet<TransactionSynchronization>),还划了重点的那个

  • 此时会先 doSuspendSynchronization(); ,里面很简单,就是把我们注册的那些 TransactionSynchronizationsuspend() 方法都跑一遍,并且把它们全部清除避免被跑两次以上,并且拿到这部分注册的 linkedHashSet<TransactionSynchronization> : suspendedSynchronizations 如下:

	private List<TransactionSynchronization> doSuspendSynchronization() {

List<TransactionSynchronization> suspendedSynchronizations =

TransactionSynchronizationManager.getSynchronizations();

for (TransactionSynchronization synchronization : suspendedSynchronizations) {

synchronization.suspend();

}

TransactionSynchronizationManager.clearSynchronization();

return suspendedSynchronizations;

}

紧接着调用 PlatfromTransactionManagerdoSuspend(),代码也是很简单,就是把刚才来来回回讲的那个 transactionHolder 移除掉,返回的是我们的事务对象,也就是包含了 connection 的那个。

	protected Object doSuspend(Object transaction) {

DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;

txObject.setConnectionHolder(null);

return TransactionSynchronizationManager.unbindResource(obtainDataSource());

}

后续的操作也很容易看懂,我们把它保存在 TransactionSynchronizationManager 的那些 ThreadLocal 全部拿到原值,并把现有的值清除掉,并且我们把上面说的那些乱七八糟的包括 suspendedSynchronizations ,以及 transaction 一并放到一个叫做 SuspendedResourcesHolder 的对象里面。

这样,这个事务就和当前线程没有半毛钱关系了,这些变量全部被保存在 SuspendedResourcesHolder 里面,这个保存着事务信息的对象会保存在我们新的 TransactionStatus 里面,那么问题来了,怎么恢复?实际上你都知道怎么挂起了,还不知道怎么恢复吗?

3.4.2 resume 恢复事务

当然是反过来操作!

resume() 的调用入口如下:

AbstractPlatformTransactionManager

getTransaction(TransactionDefinition)

resumeAfterBeginException(Object, SuspendedResourcesHolder, Throwable)

cleanupAfterCompletion(DefaultTransactionStatus)

很简单,分两种,第一种是当前事务报错了会恢复到上一个事务。第二种是当前事务执行完毕了,会调用恢复。代码很简单,如下:我就不啰嗦了,是上面的反操作。

-- 代码位于 org.springframework.transaction.support.AbstractPlatformTransactionManager#resume --

protected final void resume(@Nullable Object transaction, @Nullable SuspendedResourcesHolder resourcesHolder)

throws TransactionException {

if (resourcesHolder != null) {

Object suspendedResources = resourcesHolder.suspendedResources;

if (suspendedResources != null) {

doResume(transaction, suspendedResources);

}

List<TransactionSynchronization> suspendedSynchronizations = resourcesHolder.suspendedSynchronizations;

if (suspendedSynchronizations != null) {

TransactionSynchronizationManager.setActualTransactionActive(resourcesHolder.wasActive);

TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(resourcesHolder.isolationLevel);

TransactionSynchronizationManager.setCurrentTransactionReadOnly(resourcesHolder.readOnly);

TransactionSynchronizationManager.setCurrentTransactionName(resourcesHolder.name);

doResumeSynchronization(suspendedSynchronizations);

}

}

}

3.5 TransactionStatus 总结

我们再回顾以及总结一下 TransactionStatus 这个 spring 事务运行时的重要对象,它的重要的方法以及成员变量如下所示,我们的内嵌事务,传播等级,事务是不是同一个事物,无非就是来操作这几个方法:

四、剩余流程(提交与回滚)解析

我们已经把最重要对象 TransactionStatus 的源码解析的七七八八了,剩下的事务控制实际上已经很简单了,获取到 TransactionInfo (主要是内部那个 TransactionStatus 以及对 TransactionSychronizationManager 的操作):

后续的操作我们看到,就是三部分:报错做什么、最终做什么、怎么提交。

if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {

// Standard transaction demarcation with getTransaction and commit/rollback calls.

TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification); // 创建或者拿到当前事务信息

Object retVal;

try {

// This is an around advice: Invoke the next interceptor in the chain.

// This will normally result in a target object being invoked.

retVal = invocation.proceedWithInvocation();// 执行切面

}

catch (Throwable ex) {

// target invocation exception

completeTransactionAfterThrowing(txInfo, ex);// 回滚逻辑

throw ex;

}

finally {

cleanupTransactionInfo(txInfo);// 拿回上一个事务

}

commitTransactionAfterReturning(txInfo);// 提交当前事务

return retVal;

}

cleanupTransactionInfo() 我们已经讲过,就是 TransactionInfo 里有一个上一个事务 TransactionInfo 的引用,只是把它找回来,没什么太多的东西。

4.1 事务怎么做回滚

回滚的核心方法在这里:

			if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) {

try {

txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());

}

catch (TransactionSystemException ex2) {

logger.error("Application exception overridden by rollback exception", ex);

ex2.initApplicationException(ex);

throw ex2;

}

catch (RuntimeException | Error ex2) {

logger.error("Application exception overridden by rollback exception", ex);

throw ex2;

}

}

我们可以看到,回滚的时候,会先保证处于我们标识的异常里,默认是所有的 ErrorRuntimeException ,注意这点哦,如果你对非 RuntimeException 进行了抛出,是不会回滚的!或者你重写了 Throwable,也不会!

回滚的方法比较简单,但是又几个要注意的点:

				triggerBeforeCompletion(status);

if (status.hasSavepoint()) {

if (status.isDebug()) {

logger.debug("Rolling back transaction to savepoint");

}

status.rollbackToHeldSavepoint();

}

else if (status.isNewTransaction()) {

if (status.isDebug()) {

logger.debug("Initiating transaction rollback");

}

doRollback(status);

}

else {

// Participating in larger transaction

if (status.hasTransaction()) {

if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {

if (status.isDebug()) {

logger.debug("Participating transaction failed - marking existing transaction as rollback-only");

}

doSetRollbackOnly(status);

}

else {

if (status.isDebug()) {

logger.debug("Participating transaction failed - letting transaction originator decide on rollback");

}

}

}

else {

logger.debug("Should roll back transaction but cannot - no transaction available");

}

// Unexpected rollback only matters here if we"re asked to fail early

if (!isFailEarlyOnGlobalRollbackOnly()) {

unexpectedRollback = false;

}

}

}

catch (RuntimeException | Error ex) {

triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);

throw ex;

}

triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);

我们可以看到有很多的触发(trigger),这些触发实际上都是我们刚才讲的那个 Set<TransactionSynchronization> 这里就不多说了。

这里的回滚分为几种情况:

  1. 有保存点的,刚才讲过了,看漏的回去翻翻 3.3 PROPAGATION_NESTED 与 SAVEPOINT保存点 - 3.4.2 resume 恢复事务
  2. 崭新的事务!这是最简单的,就是调用 connection.rollback()

	protected void doRollback(DefaultTransactionStatus status) {

DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();

Connection con = txObject.getConnectionHolder().getConnection();

if (status.isDebug()) {

logger.debug("Rolling back JDBC transaction on Connection [" + con + "]");

}

try {

con.rollback();

}

}

  1. 如果有事务,则通过 TransactionStatusConnectionHolder 的变量 rollbackOnly 设置为真

是的,到这里你发现除了保存点的回滚,这里没有做任何实质的回滚操作!它只是设置了一下 rollbackOnly,然后抛出异常,不过这个异常比如 mybatis 会捕获,并调用 sqlSession.rollback() 真正做回滚,不过这是 mybatis 的操作了。

4.2 事务怎么做提交

提交也和回滚一样的道理,入口如下:

	protected void commitTransactionAfterReturning(@Nullable TransactionInfo txInfo) {

if (txInfo != null && txInfo.getTransactionStatus() != null) {

if (logger.isTraceEnabled()) {

logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "]");

}

txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());

}

}

它的实现如下:

	public final void commit(TransactionStatus status) throws TransactionException {

if (status.isCompleted()) {

throw new IllegalTransactionStateException(

"Transaction is already completed - do not call commit or rollback more than once per transaction");

}

DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;

if (defStatus.isLocalRollbackOnly()) {

if (defStatus.isDebug()) {

logger.debug("Transactional code has requested rollback");

}

processRollback(defStatus, false);

return;

}

if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {

if (defStatus.isDebug()) {

logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");

}

processRollback(defStatus, true);

return;

}

processCommit(defStatus);

}

在这里,它会判断有没有其他地方给 ConnectionHolder 设置了 rollbackOnly,如果设置了,也会进行回滚操作,和 4.1 一样,最终抛出异常:

如果没有则正常调用 processCommit(),,这里不细讲,因为和 processRollback() 没什么太大区别,就是几个触发(trigger),然后如果有保存点,就释放一下保存点,最终来到 PlatformTransactionManager#docommit()

值得注意的是,只有崭新的事务( newTransaction = true )才会调用 docommit(),内嵌事务就不是崭新的事务,怎么判断崭新事务,就是那个创建了 ConnectionHolder,开启了连接的那一个事务,凡是拿了已有的 ConnectionHolder 进行操作的事务都不是崭新的事务:

	protected void doCommit(DefaultTransactionStatus status) {

DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();

Connection con = txObject.getConnectionHolder().getConnection();

if (status.isDebug()) {

logger.debug("Committing JDBC transaction on Connection [" + con + "]");

}

try {

con.commit();

}

}

代码一样很简单,这里就不啰嗦了。


参考源码:spring-boot 2.2.2,即 spring 5.2.2

以上是 spring详细事务源码解析——事务怎么挂起的?传播等级怎么实现的?保存点、内嵌事务怎么做回滚和提交? 的全部内容, 来源链接: utcz.com/z/512563.html

回到顶部