分布式之分布式事务、分布式锁、接口幂等性、分布式session

coding

一、分布式session

  session 是啥?浏览器有个 cookie,在一段时间内这个 cookie 都存在,然后每次发请求过来都带上一个特殊的 jsessionid cookie,就根据这个东西,在服务端可以维护一个对应的 session 域,里面可以放点数据。

  一般的话只要你没关掉浏览器,cookie 还在,那么对应的那个 session 就在,但是如果 cookie 没了,session 也就没了。常见于什么购物车之类的东西,还有登录状态保存之类的。

  这个不多说了,懂 Java 的都该知道这个。

  单块系统的时候这么玩儿 session 没问题,但是你要是分布式系统呢,那么多的服务,session 状态在哪儿维护啊?

  (1)完全不用 session

  使用 JWT Token 储存用户身份,然后再从数据库或者 cache 中获取其他的信息。这样无论请求分配到哪个服务器都无所谓

  (2)tomcat + redis

  这个其实还挺方便的,就是使用 session 的代码,跟以前一样,还是基于 tomcat 原生的 session 支持即可,然后就是用一个叫做 Tomcat RedisSessionManager 的东西,让所有我们部署的 tomcat 都将 session 数据存储到 redis 即可。

  在 tomcat 的配置文件中配置:

<Valve className="com.orangefunction.tomcat.redissessions.RedisSessionHandlerValve" />

<Manager className="com.orangefunction.tomcat.redissessions.RedisSessionManager"

host="{redis.host}"

port="{redis.port}"

database="{redis.dbnum}"

maxInactiveInterval="60"/>

  然后指定 redis 的 host 和 port 就 ok 了。

<Valve className="com.orangefunction.tomcat.redissessions.RedisSessionHandlerValve" />

<Manager className="com.orangefunction.tomcat.redissessions.RedisSessionManager"

sentinelMaster="mymaster"

sentinels="<sentinel1-ip>:26379,<sentinel2-ip>:26379,<sentinel3-ip>:26379"

maxInactiveInterval="60"/>

  还可以用上面这种方式基于 redis 哨兵支持的 redis 高可用集群来保存 session 数据,都是 ok 的。

  (3)spring session + redis

  上面所说的第二种方式会与 tomcat 容器重耦合,如果我要将 web 容器迁移成 jetty,难道还要重新把 jetty 都配置一遍?

  因为上面那种 tomcat + redis 的方式好用,但是会严重依赖于web容器,不好将代码移植到其他 web 容器上去,尤其是你要是换了技术栈咋整?比如换成了 spring cloud 或者是 spring boot 之类的呢?

  所以现在比较好的还是基于 Java 一站式解决方案,也就是 spring。人家 spring 基本上承包了大部分我们需要使用的框架,spirng cloud 做微服务,spring boot 做脚手架,所以用 sping session 是一个很好的选择。

  在 pom.xml 中配置:

<dependency>

<groupId>org.springframework.session</groupId>

<artifactId>spring-session-data-redis</artifactId>

<version>1.2.1.RELEASE</version>

</dependency>

<dependency>

<groupId>redis.clients</groupId>

<artifactId>jedis</artifactId>

<version>2.8.1</version>

</dependency>

  在 spring 配置文件中配置:

<bean id="redisHttpSessionConfiguration"

class="org.springframework.session.data.redis.config.annotation.web.http.RedisHttpSessionConfiguration">

<property name="maxInactiveIntervalInSeconds" value="600"/>

</bean>

<bean id="jedisPoolConfig" class="redis.clients.jedis.JedisPoolConfig">

<property name="maxTotal" value="100" />

<property name="maxIdle" value="10" />

</bean>

<bean id="jedisConnectionFactory"

class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory" destroy-method="destroy">

<property name="hostName" value="${redis_hostname}"/>

<property name="port" value="${redis_port}"/>

<property name="password" value="${redis_pwd}" />

<property name="timeout" value="3000"/>

<property name="usePool" value="true"/>

<property name="poolConfig" ref="jedisPoolConfig"/>

</bean>

  在 web.xml 中配置:

<filter>

<filter-name>springSessionRepositoryFilter</filter-name>

<filter-class>org.springframework.web.filter.DelegatingFilterProxy</filter-class>

</filter>

<filter-mapping>

<filter-name>springSessionRepositoryFilter</filter-name>

<url-pattern>/*</url-pattern>

</filter-mapping>

  示例代码:

@RestController

@RequestMapping("/test")

publicclass TestController {

@RequestMapping("/putIntoSession")

public String putIntoSession(HttpServletRequest request, String username) {

request.getSession().setAttribute("name", "leo");

return "ok";

}

@RequestMapping("/getFromSession")

public String getFromSession(HttpServletRequest request, Model model){

String name = request.getSession().getAttribute("name");

return name;

}

}

  上面的代码就是 ok 的,给 sping session 配置基于 redis 来存储 session 数据,然后配置了一个 spring session 的过滤器,这样的话,session 相关操作都会交给 spring session 来管了。接着在代码中,就用原生的 session 操作,就是直接基于 spring sesion 从 redis 中获取数据了。

  实现分布式的会话有很多种方式,我说的只不过是比较常见的几种方式,tomcat + redis 早期比较常用,但是会重耦合到 tomcat 中;近些年,通过 spring session 来实现。

二、分布式事务

  当我们的单个数据库的性能产生瓶颈的时候,我们可能会对数据库进行分区,这里所说的分区指的是物理分区,分区之后可能不同的库就处于不同的服务器上了,这个时候单个数据库的ACID已经不能适应这种情况了,而在这种ACID的集群环境下,再想保证集群的ACID几乎是很难达到,或者即使能达到那么效率和性能会大幅下降,最为关键的是再很难扩展新的分区了,这个时候如果再追求集群的ACID会导致我们的系统变得很差,这时我们就需要引入一个新的理论原则来适应这种集群的情况,就是 CAP 原则或者叫CAP定理,那么CAP定理指的是什么呢?

  

CAP定理

  CAP定理是由加州大学伯克利分校Eric Brewer教授提出来的,他指出WEB服务无法同时满足一下3个属性:

  • 一致性(Consistency) : 客户端知道一系列的操作都会同时发生(生效)
  • 可用性(Availability) : 每个操作都必须以可预期的响应结束
  • 分区容错性(Partition tolerance) : 即使出现单个组件无法可用,操作依然可以完成

  具体地讲在分布式系统中,在任何数据库设计中,一个Web应用至多只能同时支持上面的两个属性。显然,任何横向扩展策略都要依赖于数据分区。因此,设计人员必须在一致性与可用性之间做出选择。

  这个定理在迄今为止的分布式系统中都是适用的! 为什么这么说呢?

  这个时候有同学可能会把数据库的2PC(两阶段提交)搬出来说话了。OK,我们就来看一下数据库的两阶段提交。

  对数据库分布式事务有了解的同学一定知道数据库支持的2PC,又叫做 XA Transactions。

其中,XA 是一个两阶段提交协议,该协议分为以下两个阶段:

  • 第一阶段:事务协调器要求每个涉及到事务的数据库预提交(precommit)此操作,并反映是否可以提交.
  • 第二阶段:事务协调器要求每个数据库提交数据。

  其中,如果有任何一个数据库否决此次提交,那么所有数据库都会被要求回滚它们在此事务中的那部分信息。这样做的缺陷是什么呢? 咋看之下我们可以在数据库分区之间获得一致性。

  如果CAP 定理是对的,那么它一定会影响到可用性。

  如果说系统的可用性代表的是执行某项操作相关所有组件的可用性的和。那么在两阶段提交的过程中,可用性就代表了涉及到的每一个数据库中可用性的和。我们假设两阶段提交的过程中每一个数据库都具有99.9%的可用性,那么如果两阶段提交涉及到两个数据库,这个结果就是99.8%。根据系统可用性计算公式,假设每个月43200分钟,99.9%的可用性就是43157分钟, 99.8%的可用性就是43114分钟,相当于每个月的宕机时间增加了43分钟。

  以上,可以验证出来,CAP定理从理论上来讲是正确的,CAP我们先看到这里,等会再接着说。

  在分布式系统中,要实现分布式事务,无外乎那几种解决方案。

分布式事务的实现主要有以下 5 种方案:

  • XA 方案
  • TCC 方案
  • 本地消息表
  • 可靠消息最终一致性方案
  • 最大努力通知方案

  两阶段提交方案/XA方案

  所谓的 XA 方案,即:两阶段提交,有一个事务管理器的概念,负责协调多个数据库(资源管理器)的事务,事务管理器先问问各个数据库你准备好了吗?如果每个数据库都回复 ok,那么就正式提交事务,在各个数据库上执行操作;如果任何其中一个数据库回答不 ok,那么就回滚事务。

  这种分布式事务方案,比较适合单块应用里,跨多个库的分布式事务,而且因为严重依赖于数据库层面来搞定复杂的事务,效率很低,绝对不适合高并发的场景。如果要玩儿,那么基于 Spring + JTA 就可以搞定,自己随便搜个 demo 看看就知道了。

  这个方案,我们很少用,一般来说某个系统内部如果出现跨多个库的这么一个操作,是不合规的。我可以给大家介绍一下, 现在微服务,一个大的系统分成几十个甚至几百个服务。一般来说,我们的规定和规范,是要求每个服务只能操作自己对应的一个数据库。

  如果你要操作别的服务对应的库,不允许直连别的服务的库,违反微服务架构的规范,你随便交叉胡乱访问,几百个服务的话,全体乱套,这样的一套服务是没法管理的,没法治理的,可能会出现数据被别人改错,自己的库被别人写挂等情况。

  如果你要操作别人的服务的库,你必须是通过调用别的服务的接口来实现,绝对不允许交叉访问别人的数据库。

  TCC 方案

  TCC 的全称是:TryConfirmCancel

  • Try 阶段:这个阶段说的是对各个服务的资源做检测以及对资源进行锁定或者预留。
  • Confirm 阶段:这个阶段说的是在各个服务中执行实际的操作。
  • Cancel 阶段:如果任何一个服务的业务方法执行出错,那么这里就需要进行补偿,就是执行已经执行成功的业务逻辑的回滚操作。(把那些执行成功的回滚)

  这种方案说实话几乎很少人使用,我们用的也比较少,但是也有使用的场景。因为这个事务回滚实际上是严重依赖于你自己写代码来回滚和补偿了,会造成补偿代码巨大,非常之恶心。

  比如说我们,一般来说跟钱相关的,跟钱打交道的,支付、交易相关的场景,我们会用 TCC,严格保证分布式事务要么全部成功,要么全部自动回滚,严格保证资金的正确性,保证在资金上不会出现问题。

  而且最好是你的各个业务执行的时间都比较短。

  但是说实话,一般尽量别这么搞,自己手写回滚逻辑,或者是补偿逻辑,实在太恶心了,那个业务代码是很难维护的。

  本地消息表

  本地消息表其实是国外的 ebay 搞出来的这么一套思想。

  这个大概意思是这样的:

  1. A 系统在自己本地一个事务里操作同时,插入一条数据到消息表;
  2. 接着 A 系统将这个消息发送到 MQ 中去;
  3. B 系统接收到消息之后,在一个事务里,往自己本地消息表里插入一条数据,同时执行其他的业务操作,如果这个消息已经被处理过了,那么此时这个事务会回滚,这样保证不会重复处理消息;
  4. B 系统执行成功之后,就会更新自己本地消息表的状态以及 A 系统消息表的状态;
  5. 如果 B 系统处理失败了,那么就不会更新消息表状态,那么此时 A 系统会定时扫描自己的消息表,如果有未处理的消息,会再次发送到 MQ 中去,让 B 再次处理;
  6. 这个方案保证了最终一致性,哪怕 B 事务失败了,但是 A 会不断重发消息,直到 B 那边成功为止。

  这个方案说实话最大的问题就在于严重依赖于数据库的消息表来管理事务啥的,如果是高并发场景咋办呢?咋扩展呢?所以一般确实很少用。

  可靠消息最终一致性方案

  这个的意思,就是干脆不要用本地的消息表了,直接基于 MQ 来实现事务。比如阿里的 RocketMQ 就支持消息事务。

  大概的意思就是:

  1. A 系统先发送一个 prepared 消息到 mq,如果这个 prepared 消息发送失败那么就直接取消操作别执行了;
  2. 如果这个消息发送成功过了,那么接着执行本地事务,如果成功就告诉 mq 发送确认消息,如果失败就告诉 mq 回滚消息;
  3. 如果发送了确认消息,那么此时 B 系统会接收到确认消息,然后执行本地的事务;
  4. mq 会自动定时轮询所有 prepared 消息回调你的接口,问你,这个消息是不是本地事务处理失败了,所有没发送确认的消息,是继续重试还是回滚?一般来说这里你就可以查下数据库看之前本地事务是否执行,如果回滚了,那么这里也回滚吧。这个就是避免可能本地事务执行成功了,而确认消息却发送失败了。
  5. 这个方案里,要是系统 B 的事务失败了咋办?重试咯,自动不断重试直到成功,如果实在是不行,要么就是针对重要的资金类业务进行回滚,比如 B 系统本地回滚后,想办法通知系统 A 也回滚;或者是发送报警由人工来手工回滚和补偿。
  6. 这个还是比较合适的,目前国内互联网公司大都是这么玩儿的,要不你举用 RocketMQ 支持的,要不你就自己基于类似 ActiveMQ?RabbitMQ?自己封装一套类似的逻辑出来,总之思路就是这样子的。

  最大努力通知方案

  这个方案的大致意思就是:

  1. 系统 A 本地事务执行完之后,发送个消息到 MQ;
  2. 这里会有个专门消费 MQ 的最大努力通知服务,这个服务会消费 MQ 然后写入数据库中记录下来,或者是放入个内存队列也可以,接着调用系统 B 的接口;
  3. 要是系统 B 执行成功就 ok 了;要是系统 B 执行失败了,那么最大努力通知服务就定时尝试重新调用系统 B,反复 N 次,最后还是不行就放弃。

  你们公司是如何处理分布式事务的?

  如果你真的被问到,可以这么说,我们某某特别严格的场景,用的是 TCC 来保证强一致性;然后其他的一些场景基于阿里的 RocketMQ 来实现分布式事务。

  你找一个严格资金要求绝对不能错的场景,你可以说你是用的 TCC 方案;如果是一般的分布式事务场景,订单插入之后要调用库存服务更新库存,库存数据没有资金那么的敏感,可以用可靠消息最终一致性方案。

  友情提示一下,RocketMQ 3.2.6 之前的版本,是可以按照上面的思路来的,但是之后接口做了一些改变,我这里不再赘述了。

当然如果你愿意,你可以参考可靠消息最终一致性方案来自己实现一套分布式事务,比如基于 RocketMQ 来玩儿。

三、分布式锁

  redis 分布式锁

  redis 最普通的分布式锁

  第一个最普通的实现方式,就是在 redis 里创建一个 key,这样就算加锁。

SET my:lock 随机值 NX PX 30000

  执行这个命令就 ok。

  • NX:表示只有 key 不存在的时候才会设置成功。(如果此时 redis 中存在这个 key,那么设置失败,返回 nil
  • PX 30000:意思是 30s 后锁自动释放。别人创建的时候如果发现已经有了就不能加锁了。

  释放锁就是删除 key ,但是一般可以用 lua 脚本删除,判断 value 一样才删除:

-- 删除锁的时候,找到 key 对应的 value,跟自己传过去的 value 做比较,如果是一样的才删除。

if redis.call("get",KEYS[1]) == ARGV[1] then

return redis.call("del",KEYS[1])

else

return 0

end

  为啥要用随机值呢?因为如果某个客户端获取到了锁,但是阻塞了很长时间才执行完,比如说超过了 30s,此时可能已经自动释放锁了,此时可能别的客户端已经获取到了这个锁,要是你这个时候直接删除 key 的话会有问题,所以得用随机值加上面的 lua 脚本来释放锁。

但是这样是肯定不行的。因为如果是普通的 redis 单实例,那就是单点故障。或者是 redis 普通主从,那 redis 主从异步复制,如果主节点挂了(key 就没有了),key 还没同步到从节点,此时从节点切换为主节点,别人就可以 set key,从而拿到锁。

  RedLock 算法

  这个场景是假设有一个 redis cluster,有 5 个 redis master 实例。然后执行如下步骤获取一把锁:

  1. 获取当前时间戳,单位是毫秒;
  2. 跟上面类似,轮流尝试在每个 master 节点上创建锁,过期时间较短,一般就几十毫秒;
  3. 尝试在大多数节点上建立一个锁,比如 5 个节点就要求是 3 个节点 n / 2 + 1
  4. 客户端计算建立好锁的时间,如果建立锁的时间小于超时时间,就算建立成功了;
  5. 要是锁建立失败了,那么就依次之前建立过的锁删除;
  6. 只要别人建立了一把分布式锁,你就得不断轮询去尝试获取锁。

  zk 分布式锁

  zk 分布式锁,其实可以做的比较简单,就是某个节点尝试创建临时 znode,此时创建成功了就获取了这个锁;这个时候别的客户端来创建锁会失败,只能注册个监听器监听这个锁。释放锁就是删除这个 znode,一旦释放掉就会通知客户端,然后有一个等待着的客户端就可以再次重新加锁。

  

publicclass ZooKeeperSession {

privatestatic CountDownLatch connectedSemaphore = new CountDownLatch(1);

private ZooKeeper zookeeper;

private CountDownLatch latch;

public ZooKeeperSession() {

try {

this.zookeeper = new ZooKeeper("192.168.31.187:2181,192.168.31.19:2181,192.168.31.227:2181", 50000, new ZooKeeperWatcher());

try {

connectedSemaphore.await();

} catch (InterruptedException e) {

e.printStackTrace();

}

System.out.println("ZooKeeper session established......");

} catch (Exception e) {

e.printStackTrace();

}

}

/**

* 获取分布式锁

*

* @param productId

*/

public Boolean acquireDistributedLock(Long productId) {

String path = "/product-lock-" + productId;

try {

zookeeper.create(path, "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);

returntrue;

} catch (Exception e) {

while (true) {

try {

// 相当于是给node注册一个监听器,去看看这个监听器是否存在

Stat stat = zk.exists(path, true);

if (stat != null) {

this.latch = new CountDownLatch(1);

this.latch.await(waitTime, TimeUnit.MILLISECONDS);

this.latch = null;

}

zookeeper.create(path, "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);

returntrue;

} catch (Exception ee) {

continue;

}

}

}

returntrue;

}

/**

* 释放掉一个分布式锁

*

* @param productId

*/

publicvoid releaseDistributedLock(Long productId) {

String path = "/product-lock-" + productId;

try {

zookeeper.delete(path, -1);

System.out.println("release the lock for product[id=" + productId + "]......");

} catch (Exception e) {

e.printStackTrace();

}

}

/**

* 建立zk session的watcher

*

* @author bingo

* @since 2018/11/29

*

*/

privateclass ZooKeeperWatcher implements Watcher {

publicvoid process(WatchedEvent event) {

System.out.println("Receive watched event: " + event.getState());

if (KeeperState.SyncConnected == event.getState()) {

connectedSemaphore.countDown();

}

if (this.latch != null) {

this.latch.countDown();

}

}

}

/**

* 封装单例的静态内部类

*

* @author bingo

* @since 2018/11/29

*

*/

privatestaticclass Singleton {

privatestatic ZooKeeperSession instance;

static {

instance = new ZooKeeperSession();

}

publicstatic ZooKeeperSession getInstance() {

return instance;

}

}

/**

* 获取单例

*

* @return

*/

publicstatic ZooKeeperSession getInstance() {

return Singleton.getInstance();

}

/**

* 初始化单例的便捷方法

*/

publicstaticvoid init() {

getInstance();

}

}

  也可以采用另一种方式,创建临时顺序节点:

  如果有一把锁,被多个人给竞争,此时多个人会排队,第一个拿到锁的人会执行,然后释放锁;后面的每个人都会去监听排在自己前面的那个人创建的 node 上,一旦某个人释放了锁,排在自己后面的人就会被 zookeeper 给通知,一旦被通知了之后,就 ok 了,自己就获取到了锁,就可以执行代码了。

  

publicclass ZooKeeperDistributedLock implements Watcher {

private ZooKeeper zk;

private String locksRoot = "/locks";

private String productId;

private String waitNode;

private String lockNode;

private CountDownLatch latch;

private CountDownLatch connectedLatch = new CountDownLatch(1);

privateint sessionTimeout = 30000;

public ZooKeeperDistributedLock(String productId) {

this.productId = productId;

try {

String address = "192.168.31.187:2181,192.168.31.19:2181,192.168.31.227:2181";

zk = new ZooKeeper(address, sessionTimeout, this);

connectedLatch.await();

} catch (IOException e) {

thrownew LockException(e);

} catch (KeeperException e) {

thrownew LockException(e);

} catch (InterruptedException e) {

thrownew LockException(e);

}

}

publicvoid process(WatchedEvent event) {

if (event.getState() == KeeperState.SyncConnected) {

connectedLatch.countDown();

return;

}

if (this.latch != null) {

this.latch.countDown();

}

}

publicvoid acquireDistributedLock() {

try {

if (this.tryLock()) {

return;

} else {

waitForLock(waitNode, sessionTimeout);

}

} catch (KeeperException e) {

thrownew LockException(e);

} catch (InterruptedException e) {

thrownew LockException(e);

}

}

publicboolean tryLock() {

try {

// 传入进去的locksRoot + “/” + productId

// 假设productId代表了一个商品id,比如说1

// locksRoot = locks

// /locks/10000000000,/locks/10000000001,/locks/10000000002

lockNode = zk.create(locksRoot + "/" + productId, newbyte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);

// 看看刚创建的节点是不是最小的节点

// locks:10000000000,10000000001,10000000002

List<String> locks = zk.getChildren(locksRoot, false);

Collections.sort(locks);

if(lockNode.equals(locksRoot+"/"+ locks.get(0))){

//如果是最小的节点,则表示取得锁

returntrue;

}

//如果不是最小的节点,找到比自己小1的节点

int previousLockIndex = -1;

for(int i = 0; i < locks.size(); i++) {

if(lockNode.equals(locksRoot + “/” + locks.get(i))) {

previousLockIndex = i - 1;

break;

}

}

this.waitNode = locks.get(previousLockIndex);

} catch (KeeperException e) {

thrownew LockException(e);

} catch (InterruptedException e) {

thrownew LockException(e);

}

returnfalse;

}

privateboolean waitForLock(String waitNode, long waitTime) throws InterruptedException, KeeperException {

Stat stat = zk.exists(locksRoot + "/" + waitNode, true);

if (stat != null) {

this.latch = new CountDownLatch(1);

this.latch.await(waitTime, TimeUnit.MILLISECONDS);

this.latch = null;

}

returntrue;

}

publicvoid unlock() {

try {

// 删除/locks/10000000000节点

// 删除/locks/10000000001节点

System.out.println("unlock " + lockNode);

zk.delete(lockNode, -1);

lockNode = null;

zk.close();

} catch (InterruptedException e) {

e.printStackTrace();

} catch (KeeperException e) {

e.printStackTrace();

}

}

publicclass LockException extends RuntimeException {

privatestaticfinallong serialVersionUID = 1L;

public LockException(String e) {

super(e);

}

public LockException(Exception e) {

super(e);

}

}

}

redis 分布式锁和 zk 分布式锁的对比

  • redis 分布式锁,其实需要自己不断去尝试获取锁,比较消耗性能。
  • zk 分布式锁,获取不到锁,注册个监听器即可,不需要不断主动尝试获取锁,性能开销较小。

另外一点就是,如果是 redis 获取锁的那个客户端 出现 bug 挂了,那么只能等待超时时间之后才能释放锁;而 zk 的话,因为创建的是临时 znode,只要客户端挂了,znode 就没了,此时就自动释放锁。

redis 分布式锁大家没发现好麻烦吗?遍历上锁,计算时间等等......zk 的分布式锁语义清晰实现简单。

所以先不分析太多的东西,就说这两点,我个人实践认为 zk 的分布式锁比 redis 的分布式锁牢靠、而且模型简单易用。

四、接口幂等性

以上是 分布式之分布式事务、分布式锁、接口幂等性、分布式session 的全部内容, 来源链接: utcz.com/z/508785.html

回到顶部