分布式锁

编程

分布式锁" title="分布式锁">分布式锁

在叙述分布式锁前,先对锁的原理进行理解。

如何避免竞争条件

两个或多个进程读写某些共享数据,而最后的结果取决于进程运行的精确时序,称为竞争条件。——《现代操作系统》

而锁正是避免竞争条件的解决方案之一。

再列出《现代操作系统》一书中,要成为一个解决竞争条件的好方案的条件:

  1. 任何两个进程不能同时处于其临界区
  2. 不应对 CPU 的速度和数量做任何假设
  3. 临界区外运行的进程不得阻塞其他进程
  4. 不得使进程无限期等待进入临界区

互斥量

互斥量是构建锁的一个基础原理。

互斥量是一个可以处于两态之一的变量:解锁和加锁。如,用 0 表示解锁,其他值表示加锁。当一个线程或是进程需要访问临界区时,如果互斥量当前是解锁状态,即线程可以自由进入临界区,并更改互斥量的值,当然这个过程需要是原子性的。

如果互斥量已经加锁,调用线程被阻塞,直到获得锁的线程将互斥量重置。

而我们平时使用的 java 的 ReentrantLock 也是基于互斥量的原理,使用 CAS 更新互斥量,更新成功即获得锁,否则进入阻塞。

根据互斥量的定义和 ReentrantLock 的实现原理,这里总结一下互斥量的性质:

  1. 互斥量状态判断到变更的过程必须是原子性的
  2. 当无法获取互斥量时返回,无法进入临界区

对于如 ReentrantLock 的实现,如果获取不了锁,会阻塞线程,其实就是无法获取互斥量时返回并进行线程阻塞,等待解锁。

什么是分布式锁

分布式锁本质上和进程锁类似,都是协调多个进程的运行从而避免竞争条件。

但是进程锁的作用范围是同一机器上的多个进程,进程锁的实现可以依赖本地操作系统的机制,如信号量。

分布式锁的作用范围是运行在多个机器上的进程,往往需要由独立于这些进程的第三方提供锁机制。

分布式锁的实现

锁是十分重要的机制,如果一个分布式锁的实现不够成熟,带来的效果是很严重的,所引发的问题是后知后觉的,对问题排查和测试都不友好。所以使用分布式锁时推荐使用一些权威的实现,而且在使用前一定要对源码进行分析,有所了解。而且不用多说,分布式锁的实现肯定要符合上述提到的 4 个条件。

基于 Redis 实现的分布式锁

RedisLockRegistry

RedisLockRegistry 是 Spring 提供的基于 Redis 服务实现的分布式锁,具体点来说 RedisLockRegistry 是一个 DLM (Distributed Lock Manager) ,分布式锁的管理者。

具体的锁实现称为 RedisLock 。

通过以下依赖引入:

<dependency>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-integration</artifactId>

</dependency>

<dependency>

<groupId>org.springframework.integration</groupId>

<artifactId>spring-integration-redis</artifactId>

<version>${spring.version}</version>

</dependency>

RedisLock 实现了 Java 中的 Lock 接口,要从线程锁切换成使用分布式锁也是较为简单的。

RedisLock 的实现有两个关键组件,一个是 ReentrantLock 实例,为本地锁。另外一个则是基于 Redis 实现的互斥量。

先理解基于 Redis 的互斥量实现。

获取互斥量的方法利用 Lua 脚本传递命令给 Redis ,Lua 脚本如下:

local lockClientId = redis.call("GET", KEYS[1])

if lockClientId == ARGV[1] then

redis.call("PEXPIRE", KEYS[1], ARGV[2])

return true

elseif not lockClientId then

redis.call("SET", KEYS[1], ARGV[1], "PX", ARGV[2])

return true

end

return false;

根据上述的脚本可知,Redis 用于实现互斥量的键值对为 String 类型,Key 为资源标识,值则为每个应用唯一的 ClientId 。

同时获取互斥量时,都会给 Key 设置一个过期时间,这个是用于实现条件不得使进程无限期等待进入临界区,防止获取到互斥量的应用挂掉或是网络原因等让互斥量永远没法释放。

释放互斥量的方法由两个步骤完成:

  1. 获取键值对的值,判断这个键值对存储的值是否和自身的 ClientId 一致。
  2. 如果 ClientId 一致,则直接删除键值对

这个互斥量的实现,在获取时,通过 Redis 的单线程串行特性保证线程安全,通过过期时间防止死锁,同时互斥量带上应用标识 ID,保证互斥量的正确性,即谁获得谁释放。

然后要谈到的为由 ReentrantLock 实例实现的 localLock 本地锁。

我自己以前也尝试实现过分布式锁,在实现时,遇到了一个问题,进程锁是否应该作用到线程。

试想以下情况:

  1. 应用 PA 获取了分布式锁。
  2. 获取锁的是应用 PA 的线程 TA 。
  3. 线程应用 PA 的线程 TB 解锁

以上情况的问题在于,TB 是否有权解锁,以分布式锁的角度去看,好像没问题,因为分布式锁的作用范围是多个机器的独立进程,获取到锁的进程,自然可以进行解锁,而具体到以怎么样的方式解锁,不需要清楚。

如果这样的话,在线程范围上来看,这个锁的实现是不争取的。

RedisLock 的实现则利用本地锁来实现不管是在进程还是线程的范围,都能保证锁的正确性。在获取 Redis 服务的互斥量前,必须获取到本地锁。

除此之外本地锁还有一个重要的作用就是提升性能。在日常使用中,同一个资源的锁很有可能总是由同一个进程不断请求获取,那么这时分布式锁就退化成了一个线程锁,这时本地锁则可以减少对 Redis 服务的请求。这和 Java 中 synchronized 关键字所实现的锁的“偏向锁”状态思想很类似:

大多数情况下,锁不仅不存在多线程竞争,而且总是由同一线程多次获得,为了让线程获得锁的代价更低引入偏向锁。

以下为 RedisLock 的 lock 实现:

public void lock() {

// 本地锁先尝试加锁

this.localLock.lock();

while (true) {

try {

// 获取到本地锁后,获取 Redis 中的互斥量

while (!obtainLock()) {

// 由于 Redis 实现的互斥量无法阻塞线程,利用循环和线程休眠实现线程阻塞

Thread.sleep(100);

}

break;

}

catch (InterruptedException e) {

// 这个方法是不可中断的,只有获取到锁或是出错才能跳出循环

}

catch (Exception e) {

// 发生错误必须保证本地锁解锁

this.localLock.unlock();

rethrowAsLockException(e);

}

}

}

方法 obtainLock 的实现:

private boolean obtainLock() {

// 调用上述提到的 Lua 脚本。这个脚本可以重复调用,重复调用其实就是重新设置过期时间,用于实现重入性

Boolean success =

RedisLockRegistry.this.redisTemplate.execute(RedisLockRegistry.this.obtainLockScript,

Collections.singletonList(this.lockKey), RedisLockRegistry.this.clientId,

String.valueOf(RedisLockRegistry.this.expireAfter));

boolean result = Boolean.TRUE.equals(success);

if (result) {

this.lockedAt = System.currentTimeMillis();

}

return result;

}

tryLock 的实现也是类似的,接下来看 unlock 的实现:

public void unlock() {

// 本地锁先判断当前线程是否获取到了本地锁

if (!this.localLock.isHeldByCurrentThread()) {

throw new IllegalStateException("You do not own lock at " + this.lockKey);

}

// 判断本地锁是否重入

if (this.localLock.getHoldCount() > 1) {

this.localLock.unlock();

return;

}

try {

// 根据 clientId 判断是否为当前进程获取到锁

if (!isAcquiredInThisProcess()) {

throw new IllegalStateException("Lock was released in the store due to expiration. " +

"The integrity of data protected by this lock may have been compromised.");

}

// 如果当前线程被中止了,则在线程池中删除 Redis 中的键值对

if (Thread.currentThread().isInterrupted()) {

RedisLockRegistry.this.executor.execute(this::removeLockKey);

}

else {

// 删除键值对,表示释放互斥量

removeLockKey();

}

if (logger.isDebugEnabled()) {

logger.debug("Released lock; " + this);

}

}

catch (Exception e) {

ReflectionUtils.rethrowRuntimeException(e);

}

finally {

this.localLock.unlock();

}

}

通过上述的源码,可以清楚看到,RedisLock 是支持重入的。

整体的时序图如下:

sequenceDiagram

participant Business as 业务 A

participant RedisLockRegistry as RedisLockRegistry

participant RedisLock as RedisLock

participant LocalLock as ReentrantLock

participant Redis as Redis

Business ->> RedisLockRegistry: obtain(lockKey)

RedisLockRegistry ->> Business: RedisLock

Business ->> RedisLock: lock

RedisLock ->> LocalLock: lock

LocalLock -->> RedisLock: 加锁成功

loop 不断尝试获取 Redis 互斥量

RedisLock ->> Redis: 通过 Lua 脚本设置互斥量键值对

end

Redis -->> RedisLock: 设置值成功

RedisLock -->> Business: 获取锁成功

Business ->> RedisLock: unlock

RedisLock ->> LocalLock: unlock

LocalLock -->> RedisLock: 释放锁成功

RedisLock -->> Redis: del 互斥量键值对

Redis -->> RedisLock: del 成功

RedisLock -->> Business: 释放锁成功

Redis 官方实现

在 Redis 的官方文档中有对于利用 Redis 实现分布式锁的文档:Distributed locks with Redis ,提供一个规范的算法用于实现分布式锁。

文档中有两种实现分布式锁的方式,一种是针对于单个 Redis 服务的实现(一个集群也被视为一个 Redis 服务),一种是利用多个 Redis 服务实现的,高可用的实现,这种实现被称为 Redlock

基于单个 Redis 服务的实现

首先说一下基于单个 Redis 服务的实现。

通过以下命令获取互斥量:

SET resource_name my_random_value NX PX 30000

上述的命令一目了然,需要注意的是 my_random_value ,这个 my_random_value 和上面提到的 clientId 不一样,它不仅要求每个应用之间不一样,还要要求到每个锁请求都不一样。

具体来说,同一个应用中,业务线程 A 请求锁,它的值为 A_random_value ,业务线程 B 请求同一个锁,它的值为 B_random_value 。其实这样做也是为了锁不管在线程范围还是进程范围都是正确的。

释放互斥量则使用以下 Lua 脚本:

if redis.call("get",KEYS[1]) == ARGV[1] then

return redis.call("del",KEYS[1])

else

return 0

end

传递的 ARGV 为 random_value ,只有和键值对中的值相等才能够删除,即释放互斥量。

算法原理和上面说到的 RedisLock 大致一致。如果自己根据上面的算法实现的话,并不复杂,主要是 random_value 的维护。

Redlock

Redlock 的实现是基于多个 Redis 服务,通过多个 Redis 服务来保证锁的高可用性。

使用单个 Redis 服务实现的话,是会有单点问题,哪怕所使用的 Redis 服务是集群。使用 Redis Cluster 或是代理的方式实现集群,都是利用主从复制来保证高可用,但是 Redis 的主从复制是最终一致性,如果带有互斥量的 Redis 节点挂掉,那么这个互斥量已经是不可用的了,从库是没法保证持有的互斥量副本是正确的。

像上面的 RedisLock 实现,如果遇到 Redis 服务挂掉的情况,即抛出异常,业务失败。

Redlock 对于互斥量的获取和释放使用的算法和单个 Redis 服务实现的一样。Redlock 的算法实现涉及到 N 个独立的 Redis 服务,这里假设 N = 5 。这 5 个 Redis 分别运行在独立的机器上,具体算法如下:

  1. 获取当前时间,单位为毫秒。
  2. 顺序的向 N 个 Redis 服务获取互斥量,使用相同的 key (即资源名)和 random_value 发起获取互斥量的请求。在该步骤中,每个请求都会设置一个比起锁的有效时间小很多的时间作为超时时间。如锁的有效时间为 10 s ,那么请求时间的超市时间在 5-50 ms 之间。这样做的原因是避免在与多个 Redis 服务联系时,发生长时间的阻塞,如果一个 Redis 服务不活跃了,可以尽快的请求下一个服务。
  3. 只有获取到 N / 2 + 1 个 Redis 服务的互斥量才算成功获取。客户端通过步骤 1 获取的时间戳来计算花费了多少时间来成功获取互斥量。只有客户端获取互斥量的时间小于锁的有效期,才认为获取成功。
  4. 当互斥量被获取,其实际有效效时间为初始有效时间减去步骤 3 中计算出来的获取互斥量所花的时间。
  5. 如果客户端获取互斥量失败,则需要尝试向所有 Redis 服务发起释放请求。

Redlock 为了保证锁机制的安全性以及可用性,每个互斥量都会设置一个过期时间,防止资源死锁,保证锁最终总是可获得的。

上述的机制并不依赖于不同机器之间的时钟同步,而是依赖各个机器之间能够以近乎一样的时间流逝速度,机器之间的时间流逝速度不同,即 clock drift 所造成的误差与锁的自动释放时间相比,是很小的。

若从第一个 Redis 中获取到互斥量的时间为 T1 ,而从最后一个 Redis 中获取到互斥量的时间为 T2 ,那么锁的有效实际实际为:

MIN_VALIDITY=TTL-(T2-T1)-CLOCK_DRIFT

只要在这个时间内完成操作,那么就可以保证操作时线程安全的,同时只要这个时间过去后,就表示锁可以重新被获取。

Redlock 通过使用多个 Redis 服务,保证分布式锁的高可用性。当其中一个 Redis 服务宕机时,需要如何处理呢?

如果使用主从的方式进行故障转移,同样会遇到单个 Redis 服务实现的问题,无法保证一致性,那么这时有可能会让两个应用同时获取同一个锁。而如果不进行故障转移,而是直接重启服务,那么同样会有两个应用同时获取同一个锁的问题。具体的情况如下:

  1. 应用 A 成功获取了 R1 R2 R3 三个 Redis 服务的互斥量
  2. R3 宕机
  3. 应用 B 获取 R4 R5 这两个 Redis 服务的互斥量
  4. R3 宕机后重启或是进行了故障转移,之前应用 A 获取互斥量的状态丢失
  5. 应用 B 请求 R3 获取互斥量成功

这时,可能会想到去使用 Redis 的持久化。首先 Redis 的 RDB 持久化也是最终一致性的,而 AOF 也不能完全保证数据不被丢失,并且会影响性能,选择 Redis 实现分布式锁也是希望锁具备良好的性能。

为了避免上述的情况,可以使用延迟重启的方案。当服务宕机后,不应该立刻重启,而是等待一段时间后再重启,这段时间应该大于锁的有效时间,这样的话,重启后的 Redis 它之前的状态已经在客观上过期。这样的话可以保证锁的正确性,并且使分布式锁具备高可用性。

最后当锁获取失败后,应用应该等待一段随机的时间后再去重试,而且这个时间最好大于获取到 N / 2 + 1 个 Redis 的互斥量所需的时间,不然所有应用总是在相同的时间节点去重试,那么很有可能谁也没法获取到 N / 2 + 1 个 Redis 的互斥量,然后不断重试。

在应用开发中,可以使用 Redlock 成熟的实现:Redisson

基于数据库实现的分布式锁

JdbcLockRegistry

JdbcLockRegistry 同样也是 Spring 实现的一个 DLM 。

通过以下依赖引入:

<dependency>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-integration</artifactId>

</dependency>

<dependency>

<groupId>org.springframework.integration</groupId>

<artifactId>spring-integration-jdbc</artifactId>

<version>${spring.version}</version>

</dependency>

实现的原理大致与 RedisLockRegistry 一致,不同的是 JdbcLockRegistry 中的 JdbcLock 是基于数据库实现的。所以这里主要关注于 JdbcLock 如果通过数据库实现互斥量。

锁的表结构为:

CREATE TABLE INT_LOCK  (

LOCK_KEY CHAR(36) NOT NULL,

REGION VARCHAR(100) NOT NULL,

CLIENT_ID CHAR(36),

CREATED_DATE DATETIME(6) NOT NULL,

constraint LOCK_PK primary key (LOCK_KEY, REGION)

) ENGINE=InnoDB;

互斥量的实现主要依赖于以下 SQL 语句:

private String deleteQuery = "DELETE FROM %sLOCK WHERE REGION=? AND LOCK_KEY=? AND CLIENT_ID=?";

private String deleteExpiredQuery = "DELETE FROM %sLOCK WHERE REGION=? AND LOCK_KEY=? AND CREATED_DATE<?";

private String deleteAllQuery = "DELETE FROM %sLOCK WHERE REGION=? AND CLIENT_ID=?";

private String updateQuery = "UPDATE %sLOCK SET CREATED_DATE=? WHERE REGION=? AND LOCK_KEY=? AND CLIENT_ID=?";

private String insertQuery = "INSERT INTO %sLOCK (REGION, LOCK_KEY, CLIENT_ID, CREATED_DATE) VALUES (?, ?, ?, ?)";

private String countQuery = "SELECT COUNT(REGION) FROM %sLOCK WHERE REGION=? AND LOCK_KEY=? AND CLIENT_ID=? AND CREATED_DATE>=?";

获取互斥量的源码如下:

@Transactional(isolation = Isolation.SERIALIZABLE, timeout = 1)

@Override

public boolean acquire(String lock) {

deleteExpired(lock);

if (this.template.update(this.updateQuery, new Date(), this.region, lock, this.id) > 0) {

return true;

}

try {

return this.template.update(this.insertQuery, this.region, lock, this.id, new Date()) > 0;

}

catch (DuplicateKeyException e) {

return false;

}

}

根据上述的代码,可以了解到获取步骤如下:

  1. 删除已经过期的锁,这里通过调用 deleteExpiredQuery SQL 语句实现
  2. 通过 updateQuery 尝试更新过期时间,这里其实是一个重入操作,如果更新成功,则表示锁重入
  3. 通过调用 insertQuery 插入一条记录,插入成功即表示成功获取到锁,提是重入插入错误则表示互斥量已经被获取

根 RedisLock 一样,JdbcLock 的互斥量获取同样不具备阻塞的能力,需要在后续利用 Thread.sleep 实现阻塞功能。

释放互斥量则很简单,通过 deleteQuery 删除掉对应的记录。

最后还有 deleteAllQuery 语句和 countQuery 语句。deleteAllQuery 是关闭时调用的,删除掉该应用持有的所有锁,而 countQuery 则是用于判断是否获取了互斥量。

通过实现方式可以看出,JdbcLockRegistry 的性能是不高的,比不过 RedisLockRegistry ,但是得益于数据库的安全性和可靠性,JdbcLockRegistry 有很好的锁安全性。

如果业务对可靠性和安全性的要求很高,可以使用 JdbcLockRegistry ,或者是如内部后台,而且没有引入 Redis 的话,也可以尝试使用 JdbcLockRegistry 。

基于数据库锁的实现

这里讨论的只有基于 MySQL 数据库的实现。

数据库中,为了保证数据的一致性,通常都会有加锁的操作选项。

在 MySQL 中,可以使用以下语句进行加锁:

SELECT * FROM t WHERE id = 1 FOR UPDATE

这样的话,将会对 id 为 1 的记录加锁,这个锁为行锁,之后非这个事务的行写操作或是 SELECT ... FOR UPDATE 操作都会被阻塞,即到这个事务被提交。关于 MySQL 锁的详细资料参考:MySQL 8.0 Reference Manual - InnoDB Locking and Transaction Model

关于基于数据锁的实现示例,这里参考 Quartz 的数据库锁实现。

在实现中,涉及到的 SQL 语句如下:

SELECT *

FROM qrtz_locks

WHERE sched_name = ${sched_name}

AND lock_name = ${lock_name} FOR UPDATE;

INSERT INTO qrtz_locks (${sched_name}, ${lock_name}) VALUES("", "");

由于锁的释放持续到事务的提交,所以相关的数据库连接必须设置为事务非自动提交。

以下为 Quartz 中获取锁的相关源码:

public boolean obtainLock(Connection conn, String lockName)

throws LockException {

if(log.isDebugEnabled()) {

log.debug(

"Lock "" + lockName + "" is desired by: "

+ Thread.currentThread().getName());

}

if (!isLockOwner(lockName)) {

executeSQL(conn, lockName, expandedSQL, expandedInsertSQL);

if(log.isDebugEnabled()) {

log.debug(

"Lock "" + lockName + "" given to: "

+ Thread.currentThread().getName());

}

getThreadLocks().add(lockName);

} else if(log.isDebugEnabled()) {

log.debug(

"Lock "" + lockName + "" Is already owned by: "

+ Thread.currentThread().getName());

}

return true;

}

重点首先放在 getThreadLocks().add(lockName) 中。

private HashSet<String> getThreadLocks() {

HashSet<String> threadLocks = lockOwners.get();

if (threadLocks == null) {

threadLocks = new HashSet<String>();

lockOwners.set(threadLocks);

}

return threadLocks;

}

Quartz 的分布式实现通过标记获取锁的线程来将锁的粒度控制到线程纬度,作用和上面提到的分布式锁实现一致。

再来看 executeSQL 方法为,该方法为执行相关的 SQL 语句,对行进行加锁,阻塞其他想要获取锁的访问。详细的实现在对象 StdRowLockSemaphore 中。

总体看来,获取分布式锁的操作步骤如下:

  1. 通过 SELECT ... FOR UPDATE 语句加锁
  2. 步骤 1 的 SQL 成功返回,表示获取了相应的行锁,这时检查表中是否有相应的记录,如果没有则通过 INSERT 语句插入相应的记录。如果 SQL 语句没有返回,则表示没获取到锁,这时连接会阻塞。
  3. 进行相应的业务操作,这时进行的业务操作是线程安全的
  4. 提交事务,同时删除本地的线程锁标志,释放锁

如果想要实现 Java 中的 Lock 接口,需要有 tryLock 的机制,通过 FOR UPDATE 进行加锁会对连接进行阻塞,直到可以获取到锁或是连接断开报错。

在 MySQL 8.0 中,如果相应的记录被加所,可以通过以下语句跳过锁等待,直接返回。

-- 如果 id = 1 的记录已经被加锁,该请求会直接返回错误

SELECT * FROM t WHERE id = 1 FOR UPDATE NOWAIT;

总结

基于 Redis 与基于数据库实现的分布式锁是比较常见常用的,当然还有基于 Zookeeper 的实现,或是基于其他能够提供原子性操作的服务实现,其核心思想也是一样的,利用服务的原子性操作实现互斥量,进而再实现锁的机制。

以上是 分布式锁 的全部内容, 来源链接: utcz.com/z/511624.html

回到顶部