Springcloud、seata分布式事务解决方案实践——AT模式
Seata官方文档地址: http://seata.io/zh-cn/docs/overview/what-is-seata.html
Spring cloud参考文档: https://spring.io/projects/spring-cloud#learn
本示例源码仓库地址:https://github.com/bugbycode/spring_cloud_dev.git
工作准备
开发工具:Eclipse/Myeclipse/IntelliJ IDEA 任选其一
·运行环境:jdk1.8及以上版本
·数据库:MySQL 5.7
·示例采用框架:Spring boot 2.2.4、Spring cloud Hoxton.SR5 1.3.0、Spring security oauth2、Mybatis、Element ui、Seata 1.3.0
Seata服务部署
1、下载最1.3.0版本的Seata服务,Seata服务下载地址如下:
https://github.com/seata/seata/releases
2、解压后分别修改seata服务配置文件registry.conf和file.conf,修改后内容分别如下所示:
registry {
# file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
type = "eureka"
……
eureka {
serviceUrl = "http:// localhost:8761/eureka"
application = "seata-server"
weight = "1"
}
……
file {
name = "file.conf"
}
}
config {
# file、nacos 、apollo、zk、consul、etcd3
type = "file"
……
file {
name = "file.conf"
}
}
## transaction log store, only used in seata-server
store {
## store mode: file、db、redis
mode = "file"
## file store property
file {
## store location dir
dir = "sessionStore"
# branch session size , if exceeded first try compress lockkey, still exceeded throws exceptions
maxBranchSessionSize = 16384
# globe session size , if exceeded throws exceptions
maxGlobalSessionSize = 512
# file buffer size , if exceeded allocate new buffer
fileWriteBufferCacheSize = 16384
# when recover batch read size
sessionReloadReadSize = 100
# async, sync
flushDiskMode = async
}
……
}
3、执行如下命令启动seata服务:
# seata/bin/seata-server.sh --host localhost --port 8799
4、在注册中心查看seata服务运行状况,如下所示:
Spring cloud微服务
集成seata
Maven依赖
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-all</artifactId>
<version>${seata.version}</version>
</dependency>
seata客户端配置
seata:
enabled: true
application-id: ${spring.application.name}
tx-service-group: ${spring.application.name}_group
enable-auto-data-source-proxy: true
use-jdk-proxy: false
excludes-for-auto-proxying: firstClassNameForExclude,secondClassNameForExclude
client:
rm:
async-commit-buffer-limit: 1000
report-retry-count: 5
table-meta-check-enable: false
report-success-enable: false
saga-branch-register-enable: false
lock:
retry-interval: 10
retry-times: 30
retry-policy-branch-rollback-on-conflict: true
tm:
degrade-check: false
degrade-check-period: 2000
degrade-check-allow-times: 10
commit-retry-count: 5
rollback-retry-count: 5
undo:
data-validation: true
log-serialization: jackson
log-table: undo_log
only-care-update-columns: true
log:
exceptionRate: 100
service:
vgroupMapping:
${spring.application.name}_group: seata-server
enable-degrade: false
disable-global-transaction: false
transport:
shutdown:
wait: 3
thread-factory:
boss-thread-prefix: NettyBoss
worker-thread-prefix: NettyServerNIOWorker
server-executor-thread-prefix: NettyServerBizHandler
share-boss-worker: false
client-selector-thread-prefix: NettyClientSelector
client-selector-thread-size: 1
client-worker-thread-prefix: NettyClientWorkerThread
worker-thread-size: default
boss-thread-size: 1
type: TCP
server: NIO
heartbeat: true
serialization: seata
compressor: none
enable-client-batch-send-request: true
config:
type: file
registry:
type: eureka
eureka:
weight: 1
service-url: http://localhost:8761/eureka
配置数据源
1、本示例采用初始化数据源以及Mybatis的SqlSessionFactory代码如下所示:
@Configuration
@MapperScan(basePackages = "com.fort.mapper",sqlSessionFactoryRef = "sqlSessionFactory")
public class DataSourceConfig {
@Bean("dataSource")
@ConfigurationProperties(prefix="spring.server.datasource")
public DataSource getDataSource() {
return DataSourceBuilder.create(BasicDataSource.class.getClassLoader()).build();
}
@Bean("sqlSessionFactory")
public SqlSessionFactory getSqlSessionFactory() throws Exception {
ResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();
SqlSessionFactoryBean sf = new SqlSessionFactoryBean();
sf.setDataSource(getDataSource());
sf.setConfigLocation(resolver.getResource("classpath:mybatis/config/mybatis-config.xml"));
sf.setMapperLocations(resolver.getResources("classpath:mybatis/mapper/*/*.xml"));
return sf.getObject();
}
/**
* 1、在需要事务管理的地方加@Transactional 注解。@Transactional 注解可以被应用于接口定义和接口方法、类定义和类的 public 方法上。
* 2、@Transactional 注解只能应用到 public 可见度的方法上。 如果你在 protected、private 或者 package-visible 的方法上使用 @Transactional 注解,它也不会报错, 但是这个被注解的方法将不会展示已配置的事务设置。
* 3、注意仅仅 @Transactional 注解的出现不足于开启事务行为,它仅仅 是一种元数据。必须在配置文件中使用配置元素,才真正开启了事务行为。
* 4、通过 元素的 “proxy-target-class” 属性值来控制是基于接口的还是基于类的代理被创建。如果 “proxy-target-class” 属值被设置为 “true”,那么基于类的代理将起作用(这时需要CGLIB库cglib.jar在CLASSPATH中)。如果 “proxy-target-class” 属值被设置为 “false” 或者这个属性被省略,那么标准的JDK基于接口的代理将起作用。
* 5、Spring团队建议在具体的类(或类的方法)上使用 @Transactional 注解,而不要使用在类所要实现的任何接口上。在接口上使用 @Transactional 注解,只能当你设置了基于接口的代理时它才生效。因为注解是 不能继承 的,这就意味着如果正在使用基于类的代理时,那么事务的设置将不能被基于类的代理所识别,而且对象也将不会被事务代理所包装。
* 6、@Transactional 的事务开启 ,或者是基于接口的 或者是基于类的代理被创建。所以在同一个类中一个方法调用另一个方法有事务的方法,事务是不会起作用的。
* @param dataSource
* @return
*/
@Bean("transactionManager")
@Resource(name="dataSource")
public DataSourceTransactionManager getTransactionManager(DataSource dataSource) {
return new DataSourceTransactionManager(dataSource);
}
}
2、yml配置如下
spring:
server:
datasource:
url: jdbc:mysql://localhost:3306/fort?serverTimezone=Hongkong&useUnicode=true&characterEncoding=UTF-8&useSSL=false&autoReconnect=true
username: root
password: root
driverClassName: com.mysql.cj.jdbc.Driver
initialSize: 10
maxIdle: 20
minIdle: 10
maxTotal: 100
本地事务使用AOP管理(可选项)
@Aspect
@Configuration
public class TransactionAdviceConfig {
@Autowired
private PlatformTransactionManager transactionManager;
@Bean
public TransactionInterceptor txAdvice() {
DefaultTransactionAttribute txAttr_REQUIRED = new DefaultTransactionAttribute();
DefaultTransactionAttribute txAttr_READONLY = new DefaultTransactionAttribute();
NameMatchTransactionAttributeSource source = new NameMatchTransactionAttributeSource();
//如果不存在事务则创建事务
txAttr_REQUIRED.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
txAttr_READONLY.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED );
//只读
txAttr_READONLY.setReadOnly(true);
//配置事务的传播特性,定义事务会传播到那些方法上
source.addTransactionalMethod("save*", txAttr_REQUIRED);
source.addTransactionalMethod("add*", txAttr_REQUIRED);
source.addTransactionalMethod("insert*", txAttr_REQUIRED);
source.addTransactionalMethod("update*", txAttr_REQUIRED);
source.addTransactionalMethod("delete*", txAttr_REQUIRED);
source.addTransactionalMethod("*", txAttr_READONLY);//只读
return new TransactionInterceptor(transactionManager, source);
}
@Bean
public Advisor txAdviceAdvisor() {
AspectJExpressionPointcut pointcut = new AspectJExpressionPointcut();
//使用aop 定义事务,expression表示定义的是:事务要使用在那些方法上,相当于定义事务的边界
pointcut.setExpression("execution(* com.fort.service.*.impl.*.*(..))");
return new DefaultPointcutAdvisor(pointcut, txAdvice());
}
}
事务发起方
1、在事务发起方的方法上使用@GlobalTransactional注解,代码如下所示:
/**
* 修改设备信息
*
* @param asset
* @return
*/
@GlobalTransactional
@PostMapping("/updateById")
public int updateById(@RequestBody Asset asset) {
String xid = RootContext.getXID();
logger.info("My xid as : " + xid);
int rows = assetService.updateById(asset);
if("mytest".equals(asset.getName())) {
throw new FortException(FortError.SERVER_ERROR);
}
return rows;
}
2、assetService.updateById(Asset asset) 函数内使用Open Feign客户端调用授权服务的修改授权规则信息API,伪代码如下所示:
@Override
public int updateById(Asset asset) {
……
for(Rule r : ruleList) {
//修改授权规则信息
ruleFeignClient.updateById(r.getId());
}
……
}
异常处理
全局异常捕获
/**
* 全局异常捕获
* @author zhigongzhang
*
*/
@RestControllerAdvice
public class GloblaController {
/**
* 服务器内部错误 响应 500
* @param e
* @return
*/
@ExceptionHandler({Exception.class})
@ResponseStatus(HttpStatus.INTERNAL_SERVER_ERROR)
public Map<String,Object> handlerException(Exception e){
Map<String,Object> result = new HashMap<String,Object>();
result.put("error", "服务器内部错误");
result.put("error_description", e.getLocalizedMessage());
return result;
}
/**
* 服务器内部错误 响应 500
* @param e
* @return
*/
@ExceptionHandler({RuntimeException.class})
@ResponseStatus(HttpStatus.INTERNAL_SERVER_ERROR)
public Map<String,Object> handlerRuntimeException(RuntimeException e){
Map<String,Object> result = new HashMap<String,Object>();
result.put("error", "服务器内部错误");
result.put("error_description", e.getLocalizedMessage());
return result;
}
/**
* 无权访问 响应 403
* @param e
* @return
*/
@ExceptionHandler({AccessDeniedException.class})
@ResponseStatus(HttpStatus.FORBIDDEN)
public Map<String,Object> handlerAccessDeniedException(AccessDeniedException e){
Map<String,Object> result = new HashMap<String,Object>();
result.put("error", "无权访问");
result.put("error_description", e.getLocalizedMessage());
return result;
}
}
自定义异常
/**
* 自定义异常
*
* @author zhigongzhang
*
*/
public class FortException extends RuntimeException {
/**
*
*/
private static final long serialVersionUID = 4334296913237595250L;
public FortException(FortError error) {
super(error.toString());
}
}
自定义错误信息
public enum FortError {
SERVER_ERROR(1000,"自定义错误消息");
private int code;
private String message;
private FortError(int code, String message) {
this.code = code;
this.message = message;
}
public String toString() {
return String.format("%d-%s", this.code,this.message);
}
}
前端服务
登录界面
用户管理模块
设备管理模块
运维授权模块
分布式事务测试
添加事务回滚逻辑
1、在事务发起方添加异常算法,当设备名称为mytest时抛出自定义异常,伪代码如下所示:
@GlobalTransactional
@PostMapping("/updateById")
public int updateById(@RequestBody Asset asset) {
String xid = RootContext.getXID();
logger.info("My xid as : " + xid);
int rows = assetService.updateById(asset);
if("mytest".equals(asset.getName())) {
throw new FortException(FortError.SERVER_ERROR);
}
return rows;
}
分布式事务回滚测试
1、编辑设备信息,将名称改为mytest,如下所示:
2、提交设备信息后效果如下所示显示我们刚才自定义异常信息:
3、分别查看授权服务、设备服务分布式事务回滚日志信息如下所示:
总结
seata虽然可以解决分布式事务,但是解决分布式事务最好的方案就是避免分布式事务,因为解决分布式事务的主要意义在于强一致性,隔离性是很难得到保证的。
以上是 Springcloud、seata分布式事务解决方案实践——AT模式 的全部内容, 来源链接: utcz.com/z/518954.html