Spring事务01TransactionInterceptor

编程

DataSourceTransactionManager 类图,主要功能:获取事务对象、开启事务、提交事务、回滚事务

/**

* Return a transaction object for the current transaction state.

* 返回当前事务状态的事务对象

*/

protected Object doGetTransaction();

/**

* Begin a new transaction with semantics according to the given transaction definition.

* 根据给定的事务定义使用语义开始一个新的事务

*/

protected void doBegin(Object transaction, TransactionDefinition definition);

/**

* Perform an actual commit of the given transaction.

* 执行给定事务的实际提交

*/

protected void doCommit(DefaultTransactionStatus status);

/**

* Perform an actual rollback of the given transaction.

* 执行给定事务的实际回滚

*/

protected void doRollback(DefaultTransactionStatus status);

TransactionInterceptor的类结构图,实现了 环绕通知 MethodInterceptor 接口

TransactionInterceptor 实现了 MethodInterceptor 接口,TransactionInterceptor 的 invoke() 方法:

@Override

@Nullable

public Object invoke(MethodInvocation invocation) throws Throwable {

// Work out the target class: may be {@code null}.

// The TransactionAttributeSource should be passed the target class

// as well as the method, which may be from an interface.

Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);

// Adapt to TransactionAspectSupport"s invokeWithinTransaction...

return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);

}

TransactionInterceptor类的invoke()方法会调用其父类TransactionAspectSupport中的invokeWithinTransaction()方法,方法实现如下:

/**

* General delegate for around-advice-based subclasses, delegating to several other template

* methods on this class. Able to handle {@link CallbackPreferringPlatformTransactionManager}

* as well as regular {@link PlatformTransactionManager} implementations.

* @param method the Method being invoked

* @param targetClass the target class that we"re invoking the method on

* @param invocation the callback to use for proceeding with the target invocation

* @return the return value of the method, if any

* @throws Throwable propagated from the target invocation

*/

@Nullable

protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,

final InvocationCallback invocation) throws Throwable {

// If the transaction attribute is null, the method is non-transactional.

TransactionAttributeSource tas = getTransactionAttributeSource();

final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);

final PlatformTransactionManager tm = determineTransactionManager(txAttr);

final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);

if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {

// Standard transaction demarcation with getTransaction and commit/rollback calls.

// 判断是否需要开启事务

TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);

Object retVal = null;

try {

// This is an around advice: Invoke the next interceptor in the chain.

// This will normally result in a target object being invoked.

// 执行回调,如果没有后续拦截器,就进入事务方法了

retVal = invocation.proceedWithInvocation();

}

catch (Throwable ex) {

// target invocation exception

// 事务发生异常

completeTransactionAfterThrowing(txInfo, ex);

throw ex;

}

finally {

cleanupTransactionInfo(txInfo);

}

// 事务未发生异常

commitTransactionAfterReturning(txInfo);

return retVal;

}

else {

final ThrowableHolder throwableHolder = new ThrowableHolder();

// It"s a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in.

try {

Object result = ((CallbackPreferringPlatformTransactionManager) tm).execute(txAttr, status -> {

TransactionInfo txInfo = prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);

try {

return invocation.proceedWithInvocation();

}

catch (Throwable ex) {

if (txAttr.rollbackOn(ex)) {

// A RuntimeException: will lead to a rollback.

if (ex instanceof RuntimeException) {

throw (RuntimeException) ex;

}

else {

throw new ThrowableHolderException(ex);

}

}

else {

// A normal return value: will lead to a commit.

throwableHolder.throwable = ex;

return null;

}

}

finally {

cleanupTransactionInfo(txInfo);

}

});

// Check result state: It might indicate a Throwable to rethrow.

if (throwableHolder.throwable != null) {

throw throwableHolder.throwable;

}

return result;

}

catch (ThrowableHolderException ex) {

throw ex.getCause();

}

catch (TransactionSystemException ex2) {

if (throwableHolder.throwable != null) {

logger.error("Application exception overridden by commit exception", throwableHolder.throwable);

ex2.initApplicationException(throwableHolder.throwable);

}

throw ex2;

}

catch (Throwable ex2) {

if (throwableHolder.throwable != null) {

logger.error("Application exception overridden by commit exception", throwableHolder.throwable);

}

throw ex2;

}

}

}

重点分析createTransactionIfNecessary()方法,根据事务的传播属性做出不同的处理,核心是通过TransactionStatus来判断事务的属性。createTransactionIfNecessary()方法如下:

/**

* Create a transaction if necessary based on the given TransactionAttribute.

* <p>Allows callers to perform custom TransactionAttribute lookups through

* the TransactionAttributeSource.

* @param txAttr the TransactionAttribute (may be {@code null})

* @param joinpointIdentification the fully qualified method name

* (used for monitoring and logging purposes)

* @return a TransactionInfo object, whether or not a transaction was created.

* The {@code hasTransaction()} method on TransactionInfo can be used to

* tell if there was a transaction created.

* @see #getTransactionAttributeSource()

*/

@SuppressWarnings("serial")

protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,

@Nullable TransactionAttribute txAttr, final String joinpointIdentification) {

// If no name specified, apply method identification as transaction name.

if (txAttr != null && txAttr.getName() == null) {

txAttr = new DelegatingTransactionAttribute(txAttr) {

@Override

public String getName() {

return joinpointIdentification;

}

};

}

TransactionStatus status = null;

if (txAttr != null) {

if (tm != null) {

status = tm.getTransaction(txAttr);

}

else {

if (logger.isDebugEnabled()) {

logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +

"] because no transaction manager has been configured");

}

}

}

return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);

}

createTransactionIfNecessary()中会调用PlatformTransactionManager接口的getTransaction()方法,这里会调用PlatformTransactionManager接口的子类中的getTransaction()方法,接口的子类是AbstractPlatformTransactionManager,getTransaction()方法实现如下:

/**

* This implementation handles propagation behavior. Delegates to

* {@code doGetTransaction}, {@code isExistingTransaction}

* and {@code doBegin}.

* @see #doGetTransaction

* @see #isExistingTransaction

* @see #doBegin

*/

@Override

public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException {

// 调用 DataSourceTransactionManager.doGetTransaction()方法

// 将分析这个方法

Object transaction = doGetTransaction();

// Cache debug flag to avoid repeated checks.

boolean debugEnabled = logger.isDebugEnabled();

if (definition == null) {

// Use defaults if no transaction definition given.

definition = new DefaultTransactionDefinition();

}

// 是否已经存在一个 transaction

if (isExistingTransaction(transaction)) {

// Existing transaction found -> check propagation behavior to find out how to behave.

return handleExistingTransaction(definition, transaction, debugEnabled);

}

// Check definition settings for new transaction.

if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {

throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());

}

// No existing transaction found -> check propagation behavior to find out how to proceed.

if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {

throw new IllegalTransactionStateException(

"No existing transaction found for transaction marked with propagation "mandatory"");

}

// 如果是:PROPAGATION_REQUIRED、PROPAGATION_REQUIRES_NEW、PROPAGATION_NESTED

// 这三种类型将开启一个新的事务

else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||

definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||

definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {

SuspendedResourcesHolder suspendedResources = suspend(null);

if (debugEnabled) {

logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);

}

try {

boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);

DefaultTransactionStatus status = newTransactionStatus(

definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);

// 开始新事务,将分析这个方法

doBegin(transaction, definition);

prepareSynchronization(status, definition);

return status;

}

catch (RuntimeException | Error ex) {

resume(null, suspendedResources);

throw ex;

}

}

else {

// Create "empty" transaction: no actual transaction, but potentially synchronization.

if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {

logger.warn("Custom isolation level specified but no actual transaction initiated; " +

"isolation level will effectively be ignored: " + definition);

}

boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);

return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);

}

}

getTransaction()方法会调用子类DataSourceTransactionManager中的doGetTransaction()方法,doGetTransaction()方法代码如下:

/**

* Return a transaction object for the current transaction state.

* <p>The returned object will usually be specific to the concrete transaction

* manager implementation, carrying corresponding transaction state in a

* modifiable fashion. This object will be passed into the other template

* methods (e.g. doBegin and doCommit), either directly or as part of a

* DefaultTransactionStatus instance.

* <p>The returned object should contain information about any existing

* transaction, that is, a transaction that has already started before the

* current {@code getTransaction} call on the transaction manager.

* Consequently, a {@code doGetTransaction} implementation will usually

* look for an existing transaction and store corresponding state in the

* returned transaction object.

* @return the current transaction object

* @throws org.springframework.transaction.CannotCreateTransactionException

* if transaction support is not available

* @throws TransactionException in case of lookup or system errors

* @see #doBegin

* @see #doCommit

* @see #doRollback

* @see DefaultTransactionStatus#getTransaction

*/

@Override

protected Object doGetTransaction() {

DataSourceTransactionObject txObject = new DataSourceTransactionObject();

txObject.setSavepointAllowed(isNestedTransactionAllowed());

ConnectionHolder conHolder =

(ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());

txObject.setConnectionHolder(conHolder, false);

return txObject;

}

doGetTransaction()方法根据DataSource数据源获取DataSourceTransactionObject对象。接下来分析doBegin()方法,doBegin()方法的实现在DataSourceTransactionManager中,doBegin()方法如下:

/**

* Begin a new transaction with semantics according to the given transaction

* definition. Does not have to care about applying the propagation behavior,

* as this has already been handled by this abstract manager.

* <p>This method gets called when the transaction manager has decided to actually

* start a new transaction. Either there wasn"t any transaction before, or the

* previous transaction has been suspended.

* <p>A special scenario is a nested transaction without savepoint: If

* {@code useSavepointForNestedTransaction()} returns "false", this method

* will be called to start a nested transaction when necessary. In such a context,

* there will be an active transaction: The implementation of this method has

* to detect this and start an appropriate nested transaction.

* @param transaction transaction object returned by {@code doGetTransaction}

* @param definition a TransactionDefinition instance, describing propagation

* behavior, isolation level, read-only flag, timeout, and transaction name

* @throws TransactionException in case of creation or system errors

*/

/**

* This implementation sets the isolation level but ignores the timeout.

*/

@Override

protected void doBegin(Object transaction, TransactionDefinition definition) {

DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;

Connection con = null;

try {

if (!txObject.hasConnectionHolder() ||

txObject.getConnectionHolder().isSynchronizedWithTransaction()) {

Connection newCon = obtainDataSource().getConnection();

if (logger.isDebugEnabled()) {

logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");

}

txObject.setConnectionHolder(new ConnectionHolder(newCon), true);

}

txObject.getConnectionHolder().setSynchronizedWithTransaction(true);

con = txObject.getConnectionHolder().getConnection();

Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);

txObject.setPreviousIsolationLevel(previousIsolationLevel);

// Switch to manual commit if necessary. This is very expensive in some JDBC drivers,

// so we don"t want to do it unnecessarily (for example if we"ve explicitly

// configured the connection pool to set it already).

if (con.getAutoCommit()) {

txObject.setMustRestoreAutoCommit(true);

if (logger.isDebugEnabled()) {

logger.debug("Switching JDBC Connection [" + con + "] to manual commit");

}

// 开始事务,设置 autoCommit 为 false

con.setAutoCommit(false);

}

prepareTransactionalConnection(con, definition);

txObject.getConnectionHolder().setTransactionActive(true);

int timeout = determineTimeout(definition);

if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {

txObject.getConnectionHolder().setTimeoutInSeconds(timeout);

}

// Bind the connection holder to the thread.

// 这里将当前的 connection 放入 TransactionSynchronizationManager 中才会有

// 最终就是把 Connection 对象放入 ThreadLoacl 中

if (txObject.isNewConnectionHolder()) {

TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());

}

}

catch (Throwable ex) {

if (txObject.isNewConnectionHolder()) {

DataSourceUtils.releaseConnection(con, obtainDataSource());

txObject.setConnectionHolder(null, false);

}

throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);

}

}

在doGetTransaction()方法中,如果同一个线程再次进入执行,就会获取到同一个ConnectionHolder。

回到TransactionAspectSupport类的invokeWithinTransaction()方法,接下来将会调用InvocationCallback的proceedWithInvocation()方法,该方法的实现是调用了ReflectiveMethodInvocation类的proceed()方法,在3.6节的动态代理对象执行的部分已介绍过,此处不再赘述。

执行完代理对象的相关操作后,将会执行提交操作或者是回滚操作。

提交操作commitTransactionAfterReturning()方法如下:(在TransactionAspectSupport#invokeWithinTransaction()方法中

/**

* Execute after successful completion of call, but not after an exception was handled.

* Do nothing if we didn"t create a transaction.

* @param txInfo information about the current transaction

*/

protected void commitTransactionAfterReturning(@Nullable TransactionInfo txInfo) {

if (txInfo != null && txInfo.getTransactionStatus() != null) {

if (logger.isTraceEnabled()) {

logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "]");

}

txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());

}

}

commitTransactionAfterReturning()方法会调用AbstractPlatformTransactionManager类的commit()方法:

/**

* This implementation of commit handles participating in existing

* transactions and programmatic rollback requests.

* Delegates to {@code isRollbackOnly}, {@code doCommit}

* and {@code rollback}.

* @see org.springframework.transaction.TransactionStatus#isRollbackOnly()

* @see #doCommit

* @see #rollback

*/

@Override

public final void commit(TransactionStatus status) throws TransactionException {

if (status.isCompleted()) {

throw new IllegalTransactionStateException(

"Transaction is already completed - do not call commit or rollback more than once per transaction");

}

DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;

if (defStatus.isLocalRollbackOnly()) {

if (defStatus.isDebug()) {

logger.debug("Transactional code has requested rollback");

}

processRollback(defStatus, false);

return;

}

if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {

if (defStatus.isDebug()) {

logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");

}

processRollback(defStatus, true);

return;

}

processCommit(defStatus);

}

commit()方法会调用processCommit()方法,最终将调用DataSourceTransactionManager()类的doCommit()方法,这里将会提交事务:

@Override

protected void doCommit(DefaultTransactionStatus status) {

DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();

Connection con = txObject.getConnectionHolder().getConnection();

if (status.isDebug()) {

logger.debug("Committing JDBC transaction on Connection [" + con + "]");

}

try {

con.commit();

}

catch (SQLException ex) {

throw new TransactionSystemException("Could not commit JDBC transaction", ex);

}

}

如果代理对象执行操作过程中出现了异常,回到TransactionAspectSupport#invokeWithinTransaction() ,将会执行completeTransactionAfterThrowing()方法执行回滚操作:

/**

* Handle a throwable, completing the transaction.

* We may commit or roll back, depending on the configuration.

* @param txInfo information about the current transaction

* @param ex throwable encountered

*/

protected void completeTransactionAfterThrowing(@Nullable TransactionInfo txInfo, Throwable ex) {

if (txInfo != null && txInfo.getTransactionStatus() != null) {

if (logger.isTraceEnabled()) {

logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() +

"] after exception: " + ex);

}

if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) {

try {

txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());

}

catch (TransactionSystemException ex2) {

logger.error("Application exception overridden by rollback exception", ex);

ex2.initApplicationException(ex);

throw ex2;

}

catch (RuntimeException | Error ex2) {

logger.error("Application exception overridden by rollback exception", ex);

throw ex2;

}

}

else {

// We don"t roll back on this exception.

// Will still roll back if TransactionStatus.isRollbackOnly() is true.

try {

txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());

}

catch (TransactionSystemException ex2) {

logger.error("Application exception overridden by commit exception", ex);

ex2.initApplicationException(ex);

throw ex2;

}

catch (RuntimeException | Error ex2) {

logger.error("Application exception overridden by commit exception", ex);

throw ex2;

}

}

}

}

completeTransactionAfterThrowing()方法里,调用AbstractPlatformTransactionManager()类的rollback()方法,最终调用DataSourceTransactionManager的doRollback()方法,这里将会回滚事务:

@Override

protected void doRollback(DefaultTransactionStatus status) {

DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();

Connection con = txObject.getConnectionHolder().getConnection();

if (status.isDebug()) {

logger.debug("Rolling back JDBC transaction on Connection [" + con + "]");

}

try {

con.rollback();

}

catch (SQLException ex) {

throw new TransactionSystemException("Could not roll back JDBC transaction", ex);

}

}

 

以上是 Spring事务01TransactionInterceptor 的全部内容, 来源链接: utcz.com/z/513293.html

回到顶部