seataat模式工作原理
这个项目调试起来还是挺麻烦的,因为你首先需要启动注册中心和seata管理服务,并且需要三个客户端来模拟分布式事务的进行,项目结构如下(盗图一张)
tc:seata提供的事务管理中心。
tm:事务的发起者,并且最终与tc通信告诉事务的成功与否
rm:单个的微服务,也就是最终的资源管理者
实际上这张图不是很恰当,一般的业务流程中并不会有一个单独的business tm出现,实际上比较贴合的流程应该是在创建order的时候,会同时发起事务,并且最终会通过其它服务的返回信息,来决定提交来回滚,所以order服务应该即是tm又是rm,但是我又不会画图,那就这么着吧。
使用at模式,我们除了配置seata相关参数,数据源代理之外。只需要在需要进行事务控制的方法上加上@GlobalTranscation注解就可以了,那么看一下这个注解的代理方法,对于它的处理,在GlobalTransactionScanner这里做了处理,我们来重点看一下这个类。
首先看一下它的继承关系,
public class GlobalTransactionScanner extends AbstractAutoProxyCreatorimplements InitializingBean, ApplicationContextAware,
DisposableBean
它分别继承了AbstractAutoPrxoyCreator用来实现aop,并且继承了InitializingBean, ApplicationContextAware, DisposableBean来在spring bean的钩子函数中进行一些初始化的处理。
方法initClient在afterPropertiesSet钩子函数中被调用到
private void initClient() {if (LOGGER.isInfoEnabled()) {
LOGGER.info("Initializing Global Transaction Clients ... ");
}
if (StringUtils.isNullOrEmpty(applicationId) || StringUtils.isNullOrEmpty(txServiceGroup)) {
throw new IllegalArgumentException(
"applicationId: " + applicationId + ", txServiceGroup: " + txServiceGroup);
}
//这里的初始化是通过注册中心获取到 tm的地址,并且初始化tcp连接
TMClient.init(applicationId, txServiceGroup);
if (LOGGER.isInfoEnabled()) {
LOGGER.info(
"Transaction Manager Client is initialized. applicationId[" + applicationId + "] txServiceGroup["
+ txServiceGroup + "]");
}
// 同上
RMClient.init(applicationId, txServiceGroup);
if (LOGGER.isInfoEnabled()) {
LOGGER.info(
"Resource Manager is initialized. applicationId[" + applicationId + "] txServiceGroup[" + txServiceGroup
+ "]");
}
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Global Transaction Clients are initialized. ");
}
registerSpringShutdownHook();
}
再来看一下重写的wrapIfNecessary方法
@Overrideprotected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {if (disableGlobalTransaction) {
return bean;
}
try {
synchronized (PROXYED_SET) {
if (PROXYED_SET.contains(beanName)) {
return bean;
}
interceptor = null;
// 使用tcc模式
if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {
interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
} else {
Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);
Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);
if (!existsAnnotation(new Class[]{serviceInterface})
&& !existsAnnotation(interfacesIfJdk)) {
return bean;
}
if (interceptor == null) {
interceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
}
}
LOGGER.info(
"Bean[" + bean.getClass().getName() + "] with name [" + beanName + "] would use interceptor ["
+ interceptor.getClass().getName() + "]");
if (!AopUtils.isAopProxy(bean)) {
// 执行父类的方法
bean = super.wrapIfNecessary(bean, beanName, cacheKey);
} else {
// 获取这个bean的advised
AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
// 把advised包装成advisor
Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));
for (Advisor avr : advisor) {
advised.addAdvisor(0, avr);
}
}
PROXYED_SET.add(beanName);
return bean;
}
} catch (Exception exx) {
throw new RuntimeException(exx);
}
}
在postProcessBeforeInitialization中,对于dataSource进行处理
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {// 如果bean是datasource且不是seata代理过得
if (bean instanceof DataSource && !(bean instanceof DataSourceProxy) && ConfigurationFactory.getInstance().getBoolean(DATASOURCE_AUTOPROXY, false)) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Auto proxy of [" + beanName + "]");
}
DataSourceProxy dataSourceProxy = DataSourceProxyHolder.get().putDataSource((DataSource) bean);
// 通过cglib进行增强
return Enhancer.create(bean.getClass(), (org.springframework.cglib.proxy.MethodInterceptor) (o, method, args, methodProxy) -> {
Method m = BeanUtils.findDeclaredMethod(DataSourceProxy.class, method.getName(), method.getParameterTypes());
if (null != m) {
return m.invoke(dataSourceProxy, args);
} else {
boolean oldAccessible = method.isAccessible();
try {
method.setAccessible(true);
return method.invoke(bean, args);
} finally {
//recover the original accessible for security reason
method.setAccessible(oldAccessible);
}
}
});
}
return bean;
}
其实这一段大部分都是spring的知识,为什么这里要说一遍,因为看的太久已经忘了,所以复习一下。真正干活的其实还是GlobalTransactionalInterceptor,那我们再来分析一下它
@Overridepublic Object invoke(final MethodInvocation methodInvocation) throws Throwable {Class<?> targetClass = (methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null);
Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);
final GlobalTransactional globalTransactionalAnnotation = getAnnotation(method, GlobalTransactional.class);
final GlobalLock globalLockAnnotation = getAnnotation(method, GlobalLock.class);
if (globalTransactionalAnnotation != null) {
return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);
} else if (globalLockAnnotation != null) {
return handleGlobalLock(methodInvocation);
} else {
return methodInvocation.proceed();
}
}
invoke方法分别根据注解的不同来进行逻辑处理,我们重点看handleGlobalTranscation方法的执行过程
private Object handleGlobalTransaction(final MethodInvocation methodInvocation,final GlobalTransactional globalTrxAnno) throws Throwable {
try {
1.transactionalTemplate execute 传递进来的executor
return transactionalTemplate.execute(new TransactionalExecutor() {
@Override
public Object execute() throws Throwable {
return methodInvocation.proceed();
}
public String name() {
String name = globalTrxAnno.name();
if (!StringUtils.isNullOrEmpty(name)) {
return name;
}
return formatMethod(methodInvocation.getMethod());
}
// 获取transaction的info
@Override
public TransactionInfo getTransactionInfo() {
TransactionInfo transactionInfo = new TransactionInfo();
transactionInfo.setTimeOut(globalTrxAnno.timeoutMills());
transactionInfo.setName(name());
Set<RollbackRule> rollbackRules = new LinkedHashSet<>();
for (Class<?> rbRule : globalTrxAnno.rollbackFor()) {
rollbackRules.add(new RollbackRule(rbRule));
}
for (String rbRule : globalTrxAnno.rollbackForClassName()) {
rollbackRules.add(new RollbackRule(rbRule));
}
for (Class<?> rbRule : globalTrxAnno.noRollbackFor()) {
rollbackRules.add(new NoRollbackRule(rbRule));
}
for (String rbRule : globalTrxAnno.noRollbackForClassName()) {
rollbackRules.add(new NoRollbackRule(rbRule));
}
transactionInfo.setRollbackRules(rollbackRules);
return transactionInfo;
}
});
2 分类对于事务错误进行解决
} catch (TransactionalExecutor.ExecutionException e) {
TransactionalExecutor.Code code = e.getCode();
switch (code) {
case RollbackDone:
throw e.getOriginalException();
case BeginFailure:
failureHandler.onBeginFailure(e.getTransaction(), e.getCause());
throw e.getCause();
case CommitFailure:
failureHandler.onCommitFailure(e.getTransaction(), e.getCause());
throw e.getCause();
case RollbackFailure:
failureHandler.onRollbackFailure(e.getTransaction(), e.getCause());
throw e.getCause();
default:
throw new ShouldNeverHappenException("Unknown TransactionalExecutor.Code: " + code);
}
}
}
1 处其实真正干活还是它来干的
public Object execute(TransactionalExecutor business) throws Throwable {// 1.1新建一个事务
GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();
// 1.2 获取事务信息
TransactionInfo txInfo = business.getTransactionInfo();
if (txInfo == null) {
throw new ShouldNeverHappenException("transactionInfo does not exist");
}
try {
// 1.3 开始事务
beginTransaction(txInfo, tx);
Object rs = null;
try {
// business 执行
rs = business.execute();
} catch (Throwable ex) {
// 1.4 异常处理 进行回滚等操作
completeTransactionAfterThrowing(txInfo,tx,ex);
throw ex;
}
// 提交事务
commitTransaction(tx);
return rs;
} finally {
// 执行钩子函数
triggerAfterCompletion();
cleanUp();
}
}
1.1 处会新建一个GlobalTranscation,此时xid还未生成,为null
DefaultGlobalTransaction(String xid, GlobalStatus status, GlobalTransactionRole role) {this.transactionManager = TransactionManagerHolder.get();
this.xid = xid;
this.status = status;
this.role = role;
}
1.2 会根据注解来获取事务的一些配置信息,回滚规则,超时时间等等
public TransactionInfo getTransactionInfo() {TransactionInfo transactionInfo = new TransactionInfo();
transactionInfo.setTimeOut(globalTrxAnno.timeoutMills());
transactionInfo.setName(name());
Set<RollbackRule> rollbackRules = new LinkedHashSet<>();
for (Class<?> rbRule : globalTrxAnno.rollbackFor()) {
rollbackRules.add(new RollbackRule(rbRule));
}
for (String rbRule : globalTrxAnno.rollbackForClassName()) {
rollbackRules.add(new RollbackRule(rbRule));
}
for (Class<?> rbRule : globalTrxAnno.noRollbackFor()) {
rollbackRules.add(new NoRollbackRule(rbRule));
}
for (String rbRule : globalTrxAnno.noRollbackForClassName()) {
rollbackRules.add(new NoRollbackRule(rbRule));
}
transactionInfo.setRollbackRules(rollbackRules);
return transactionInfo;
}
1.3 事务的发起
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)throws TransactionException {
// 新建一个事务开始的request
GlobalBeginRequest request = new GlobalBeginRequest();
request.setTransactionName(name);
request.setTimeout(timeout);
// 获取返回值
GlobalBeginResponse response = (GlobalBeginResponse)syncCall(request);
if (response.getResultCode() == ResultCode.Failed) {
throw new TmTransactionException(TransactionExceptionCode.BeginFailed, response.getMsg());
}
// 返回一个xid
return response.getXid();
}
1.4 事务的回滚
private void completeTransactionAfterThrowing(TransactionInfo txInfo, GlobalTransaction tx, Throwable ex) throws TransactionalExecutor.ExecutionException {//如果没有事务信息而且回滚信息匹配
if (txInfo != null && txInfo.rollbackOn(ex)) {
try {
// 执行回滚操作
rollbackTransaction(tx, ex);
} catch (TransactionException txe) {
// 这里回滚操作也有可能失败 在配置中可以配置事务回滚的次数
throw new TransactionalExecutor.ExecutionException(tx, txe,
TransactionalExecutor.Code.RollbackFailure, ex);
}
} else {
// 提交
commitTransaction(tx);
}
}
以上是 seataat模式工作原理 的全部内容, 来源链接: utcz.com/z/514116.html