seataat模式工作原理

编程

    这个项目调试起来还是挺麻烦的,因为你首先需要启动注册中心和seata管理服务,并且需要三个客户端来模拟分布式事务的进行,项目结构如下(盗图一张)


        tc:seata提供的事务管理中心。

        tm:事务的发起者,并且最终与tc通信告诉事务的成功与否

        rm:单个的微服务,也就是最终的资源管理者

        实际上这张图不是很恰当,一般的业务流程中并不会有一个单独的business tm出现,实际上比较贴合的流程应该是在创建order的时候,会同时发起事务,并且最终会通过其它服务的返回信息,来决定提交来回滚,所以order服务应该即是tm又是rm,但是我又不会画图,那就这么着吧。

        使用at模式,我们除了配置seata相关参数,数据源代理之外。只需要在需要进行事务控制的方法上加上@GlobalTranscation注解就可以了,那么看一下这个注解的代理方法,对于它的处理,在GlobalTransactionScanner这里做了处理,我们来重点看一下这个类。

        首先看一下它的继承关系,

public class GlobalTransactionScanner extends AbstractAutoProxyCreator

implements InitializingBean, ApplicationContextAware,

DisposableBean

        它分别继承了AbstractAutoPrxoyCreator用来实现aop,并且继承了InitializingBean, ApplicationContextAware, DisposableBean来在spring bean的钩子函数中进行一些初始化的处理。

        方法initClient在afterPropertiesSet钩子函数中被调用到

private void initClient() {

if (LOGGER.isInfoEnabled()) {

LOGGER.info("Initializing Global Transaction Clients ... ");

}

if (StringUtils.isNullOrEmpty(applicationId) || StringUtils.isNullOrEmpty(txServiceGroup)) {

throw new IllegalArgumentException(

"applicationId: " + applicationId + ", txServiceGroup: " + txServiceGroup);

}

//这里的初始化是通过注册中心获取到 tm的地址,并且初始化tcp连接

TMClient.init(applicationId, txServiceGroup);

if (LOGGER.isInfoEnabled()) {

LOGGER.info(

"Transaction Manager Client is initialized. applicationId[" + applicationId + "] txServiceGroup["

+ txServiceGroup + "]");

}

// 同上

RMClient.init(applicationId, txServiceGroup);

if (LOGGER.isInfoEnabled()) {

LOGGER.info(

"Resource Manager is initialized. applicationId[" + applicationId + "] txServiceGroup[" + txServiceGroup

+ "]");

}

if (LOGGER.isInfoEnabled()) {

LOGGER.info("Global Transaction Clients are initialized. ");

}

registerSpringShutdownHook();

}

        再来看一下重写的wrapIfNecessary方法

@Override

protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {

if (disableGlobalTransaction) {

return bean;

}

try {

synchronized (PROXYED_SET) {

if (PROXYED_SET.contains(beanName)) {

return bean;

}

interceptor = null;

// 使用tcc模式

if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {

interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));

} else {

Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);

Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);

if (!existsAnnotation(new Class[]{serviceInterface})

&& !existsAnnotation(interfacesIfJdk)) {

return bean;

}

if (interceptor == null) {

interceptor = new GlobalTransactionalInterceptor(failureHandlerHook);

}

}

LOGGER.info(

"Bean[" + bean.getClass().getName() + "] with name [" + beanName + "] would use interceptor ["

+ interceptor.getClass().getName() + "]");

if (!AopUtils.isAopProxy(bean)) {

                // 执行父类的方法

bean = super.wrapIfNecessary(bean, beanName, cacheKey);

} else {

                // 获取这个beanadvised

AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);

                // 把advised包装成advisor

Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));

for (Advisor avr : advisor) {

advised.addAdvisor(0, avr);

}

}

PROXYED_SET.add(beanName);

return bean;

}

} catch (Exception exx) {

throw new RuntimeException(exx);

}

}

        在postProcessBeforeInitialization中,对于dataSource进行处理

public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {

    // 如果bean是datasource且不是seata代理过得

if (bean instanceof DataSource && !(bean instanceof DataSourceProxy) && ConfigurationFactory.getInstance().getBoolean(DATASOURCE_AUTOPROXY, false)) {

if (LOGGER.isInfoEnabled()) {

LOGGER.info("Auto proxy of [" + beanName + "]");

}

DataSourceProxy dataSourceProxy = DataSourceProxyHolder.get().putDataSource((DataSource) bean);

        // 通过cglib进行增强

return Enhancer.create(bean.getClass(), (org.springframework.cglib.proxy.MethodInterceptor) (o, method, args, methodProxy) -> {

Method m = BeanUtils.findDeclaredMethod(DataSourceProxy.class, method.getName(), method.getParameterTypes());

if (null != m) {

return m.invoke(dataSourceProxy, args);

} else {

boolean oldAccessible = method.isAccessible();

try {

method.setAccessible(true);

return method.invoke(bean, args);

} finally {

//recover the original accessible for security reason

method.setAccessible(oldAccessible);

}

}

});

}

return bean;

}

        其实这一段大部分都是spring的知识,为什么这里要说一遍,因为看的太久已经忘了,所以复习一下。真正干活的其实还是GlobalTransactionalInterceptor,那我们再来分析一下它

@Override

public Object invoke(final MethodInvocation methodInvocation) throws Throwable {

Class<?> targetClass = (methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null);

Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);

final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);

final GlobalTransactional globalTransactionalAnnotation = getAnnotation(method, GlobalTransactional.class);

final GlobalLock globalLockAnnotation = getAnnotation(method, GlobalLock.class);

if (globalTransactionalAnnotation != null) {

return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);

} else if (globalLockAnnotation != null) {

return handleGlobalLock(methodInvocation);

} else {

return methodInvocation.proceed();

}

}

        invoke方法分别根据注解的不同来进行逻辑处理,我们重点看handleGlobalTranscation方法的执行过程

private Object handleGlobalTransaction(final MethodInvocation methodInvocation,

final GlobalTransactional globalTrxAnno) throws Throwable {

try {

        1.transactionalTemplate execute 传递进来的executor

return transactionalTemplate.execute(new TransactionalExecutor() {

@Override

public Object execute() throws Throwable {

return methodInvocation.proceed();

}

public String name() {

String name = globalTrxAnno.name();

if (!StringUtils.isNullOrEmpty(name)) {

return name;

}

return formatMethod(methodInvocation.getMethod());

}

            // 获取transaction的info

@Override

public TransactionInfo getTransactionInfo() {

TransactionInfo transactionInfo = new TransactionInfo();

transactionInfo.setTimeOut(globalTrxAnno.timeoutMills());

transactionInfo.setName(name());

Set<RollbackRule> rollbackRules = new LinkedHashSet<>();

for (Class<?> rbRule : globalTrxAnno.rollbackFor()) {

rollbackRules.add(new RollbackRule(rbRule));

}

for (String rbRule : globalTrxAnno.rollbackForClassName()) {

rollbackRules.add(new RollbackRule(rbRule));

}

for (Class<?> rbRule : globalTrxAnno.noRollbackFor()) {

rollbackRules.add(new NoRollbackRule(rbRule));

}

for (String rbRule : globalTrxAnno.noRollbackForClassName()) {

rollbackRules.add(new NoRollbackRule(rbRule));

}

transactionInfo.setRollbackRules(rollbackRules);

return transactionInfo;

}

});

    2 分类对于事务错误进行解决

} catch (TransactionalExecutor.ExecutionException e) {

TransactionalExecutor.Code code = e.getCode();

switch (code) {

case RollbackDone:

throw e.getOriginalException();

case BeginFailure:

failureHandler.onBeginFailure(e.getTransaction(), e.getCause());

throw e.getCause();

case CommitFailure:

failureHandler.onCommitFailure(e.getTransaction(), e.getCause());

throw e.getCause();

case RollbackFailure:

failureHandler.onRollbackFailure(e.getTransaction(), e.getCause());

throw e.getCause();

default:

throw new ShouldNeverHappenException("Unknown TransactionalExecutor.Code: " + code);

}

}

}

        1 处其实真正干活还是它来干的

public Object execute(TransactionalExecutor business) throws Throwable {

// 1.1新建一个事务

GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();

// 1.2 获取事务信息

TransactionInfo txInfo = business.getTransactionInfo();

if (txInfo == null) {

throw new ShouldNeverHappenException("transactionInfo does not exist");

}

try {

// 1.3 开始事务

beginTransaction(txInfo, tx);

Object rs = null;

try {

// business 执行

rs = business.execute();

} catch (Throwable ex) {

// 1.4 异常处理 进行回滚等操作

completeTransactionAfterThrowing(txInfo,tx,ex);

throw ex;

}

// 提交事务

commitTransaction(tx);

return rs;

} finally {

// 执行钩子函数

triggerAfterCompletion();

cleanUp();

}

}

1.1 处会新建一个GlobalTranscation,此时xid还未生成,为null

DefaultGlobalTransaction(String xid, GlobalStatus status, GlobalTransactionRole role) {

this.transactionManager = TransactionManagerHolder.get();

this.xid = xid;

this.status = status;

this.role = role;

}

1.2 会根据注解来获取事务的一些配置信息,回滚规则,超时时间等等

public TransactionInfo getTransactionInfo() {

TransactionInfo transactionInfo = new TransactionInfo();

transactionInfo.setTimeOut(globalTrxAnno.timeoutMills());

transactionInfo.setName(name());

Set<RollbackRule> rollbackRules = new LinkedHashSet<>();

for (Class<?> rbRule : globalTrxAnno.rollbackFor()) {

rollbackRules.add(new RollbackRule(rbRule));

}

for (String rbRule : globalTrxAnno.rollbackForClassName()) {

rollbackRules.add(new RollbackRule(rbRule));

}

for (Class<?> rbRule : globalTrxAnno.noRollbackFor()) {

rollbackRules.add(new NoRollbackRule(rbRule));

}

for (String rbRule : globalTrxAnno.noRollbackForClassName()) {

rollbackRules.add(new NoRollbackRule(rbRule));

}

transactionInfo.setRollbackRules(rollbackRules);

return transactionInfo;

}

1.3 事务的发起

public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)

throws TransactionException {

    // 新建一个事务开始的request

GlobalBeginRequest request = new GlobalBeginRequest();

request.setTransactionName(name);

request.setTimeout(timeout);

    // 获取返回值

GlobalBeginResponse response = (GlobalBeginResponse)syncCall(request);

if (response.getResultCode() == ResultCode.Failed) {

throw new TmTransactionException(TransactionExceptionCode.BeginFailed, response.getMsg());

}

    // 返回一个xid

return response.getXid();

}

1.4 事务的回滚

private void completeTransactionAfterThrowing(TransactionInfo txInfo, GlobalTransaction tx, Throwable ex) throws TransactionalExecutor.ExecutionException {

//如果没有事务信息而且回滚信息匹配

if (txInfo != null && txInfo.rollbackOn(ex)) {

try {

            // 执行回滚操作

rollbackTransaction(tx, ex);

} catch (TransactionException txe) {

// 这里回滚操作也有可能失败 在配置中可以配置事务回滚的次数

throw new TransactionalExecutor.ExecutionException(tx, txe,

TransactionalExecutor.Code.RollbackFailure, ex);

}

} else {

// 提交

commitTransaction(tx);

}

}

 

 

 

以上是 seataat模式工作原理 的全部内容, 来源链接: utcz.com/z/514116.html

回到顶部