演示工程搭建

一个电商平台,该平台上有5000个商品,每个商品初始库存数量为1,总共有5000个用户在同一时刻下单购买商品,每个用户下单时会对所选商品库存数量进行一次减操作。如果最终所有商品的库存数量均为0,则表示所有商品已经售罄,如果有任何一个商品的库存数量大于0,则表示出现了并发问题(竞态条件问题(Race Condition)是指多个线程或进程在访问共享资源时,由于执行顺序不确定或者执行时间差异较大,导致最终的结果与执行顺序有关,出现了不确定性和不可预测性的情况。

竞态条件问题通常出现在并发编程中,例如多线程对同一个变量进行读写、多进程访问同一个文件等情况。为了更好地演示竞态条件问题,我们可以采用另一种方式来测试。

假设有5000个请求,我们将库存初始值设置为0,每个请求对库存进行一次加操作。如果最终库存数量为5000,则说明所有请求均已成功增加库存,没有出现并发问题。反之,如果最终库存数量小于5000,则说明出现了竞态条件问题。

并发问题解释

测试本地锁存在的问题

不加锁下的并发

可以发现并没有加到 5000值仅有 114。出现并发问题

添加本地锁的并发

添加 synchronized 本地锁, 将 number 值重置为 0 进行压测。得数是 5000 看似没有出现并发性问题,但是 本地锁只能在单个JVM内部生效,无法跨服务、跨工程、跨服务器实现协调和同步。

测试集群下本地锁存在的问题

copy 2 份实例模拟集群环境 将 number 设置为 0 重新压测 5000次。最终结果 2360 原因是极限情况下 3 台服务可能同时放入一个线程 同时到达 都将 num 转换为某一个数字 ++. 理论值在 5000 / 3 至 5000 间

分布式锁实现

基于什么实现 实现方式 考量
基于 MySQL 关系型数据库 设置主键, 把写入主键做为 lock
INSERT INTO lock_table (lock_name) VALUES (‘my_lock_name’) ON DUPLICATE KEY UPDATE lock_name = lock_name;
DELETE FROM lock_table WHERE lock_name = ‘my_lock_name’;
MySQL 实现方式最简单 > Redis > Zk
基于 Redis 非关系型数据库 加锁 setnx, 解锁 del, 重试 Redis 性能最高 偏向 AP > Zk 偏向 CP > MySQL
基于 Zookeeper 实现 Zk 追求 CP 可靠性 > Redis & MySQL

基于 Redis 实现分布式锁

分布式锁加锁解锁添加 else 的必要性

执行方法本质是一个入栈出栈的执行过程, testLock() 第一次执行压栈。未获取到锁进入 if() 等待睡眠时间结束后进行重试,继续压栈。如果重试后还是没有获取到锁,继续进入 if() 等待睡眠结束然后压栈。直到获取锁成功执行完后依次出栈。

本质一个请求我们只希望它对 number 进行 + 1,在不加 else 的情况下,每次重试都会使程序重新调用 testLock() 方法,从而重新执行对资源 number 的 + 操作。这样就会导致一个请求重试多少次,就会让我们多 + 多少次,增加了资源的不必要修改和开销。因此,在加了 else 的情况下,只有成功获取到锁时才进行对资源的修改,避免了重复操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// 基于 redis 实现分布式锁。借助于 setnx 指令 当 key 不存在即设置成功返回 1 当 key 存在即设置失败返回 0(加锁 解锁 重试)分布式锁特征: 独占排他互斥使用
public void testLock() {
/**
* 加锁 setIfAbsent 类似与 setNx 当 key 不存在即设置成功 否 则 失败
* 分布式锁本质就是对 key 的争抢, 谁先设置成功谁就先获取锁
*/
Boolean flag = redisTemplate.opsForValue().setIfAbsent("lock", "lock");

if (!flag) {
// 加锁失败, 进行递归调用进行重试
try {
// 睡眠一段时间(如果不设置睡眠不停的重试也可能会导致栈内存溢出) 模拟让抢到锁的线程执行业务逻辑 减少竞争
Thread.sleep(30);
// 设置锁(加锁)失败重新调用该方法进行重试
testLock();
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
String number = redisTemplate.opsForValue().get("number");

if (StringUtils.isBlank(number)) {
redisTemplate.opsForValue().set("number", "1");
}

int num = Integer.parseInt(number);

redisTemplate.opsForValue().set("number", String.valueOf(++num));

// 释放锁
redisTemplate.delete("lock");
}
}

添加过期时间防止死锁

问题: 一个线程获取到锁 还没有执行到释放锁操作 服务器宕机. 其他线程获取不到锁 即使 服务器重启 这把锁也无法被释放掉. 其他线程一直执行递归操作 最终导致服务器资源耗尽而宕机

解决: 在获取锁的同时设置过期时间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
    public void testLock() {
/**
* 加锁 setIfAbsent 类似与 setNx 当 key 不存在即设置成功 否 则 失败
* 分布式锁本质就是对 key 的争抢, 谁先设置成功谁就先获取锁
*/
Boolean flag = redisTemplate.opsForValue().setIfAbsent("lock", "lock", 3, TimeUnit.SECONDS); // 解决死锁 添加过期时间在 set(获取锁) 时, 去设置过期时间;

if (!flag) {
// 加锁失败, 进行递归调用进行重试
try {
// 睡眠一段时间(如果不设置睡眠不停的重试也可能会导致栈内存溢出) 模拟让抢到锁的线程执行业务逻辑 减少竞争
Thread.sleep(30);
// 设置锁(加锁)失败重新调用该方法进行重试
testLock();
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {

// 添加过期时间 (缺乏原子性:如果在 setnx 和 expire 之间出现异常,锁也无法释放)
// redisTemplate.expire("lock", 3, TimeUnit.SECONDS);

String number = redisTemplate.opsForValue().get("number");

if (StringUtils.isBlank(number)) {
redisTemplate.opsForValue().set("number", "1");
}

int num = Integer.parseInt(number);

redisTemplate.opsForValue().set("number", String.valueOf(++num));

// 释放锁
redisTemplate.delete("lock");
}
}

添加 UUID 防误删

问题: A、B、C 三个服务都在尝试获取同一个锁,并且这个锁没有设置续期过期时间。
如果业务逻辑的执行时间是7s, A 服务获取锁 业务没有执行完 锁3秒被自动释放, B 服务获取到锁 业务没有执行完 锁3秒被自动释放, C 服务获取锁执行业务逻辑.A 服务业务执行完成 释放锁, 这时释放的是 C 的锁. 导致 C 业务只执行了 1s 就被别人释放. 最终等于没有锁(可能会释放其他服务器的锁)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
    public void testLock() {

String uuid = UUID.randomUUID().toString();

/**
* 加锁 setIfAbsent 类似与 setNx 当 key 不存在即设置成功 否 则 失败
* 分布式锁本质就是对 key 的争抢, 谁先设置成功谁就先获取锁
*/
Boolean flag = redisTemplate.opsForValue().setIfAbsent("lock", uuid, 3, TimeUnit.SECONDS); // 解决死锁 添加过期时间在 set(获取锁) 时, 去设置过期时间;

if (!flag) {
// 加锁失败, 进行递归调用进行重试
try {
// 睡眠一段时间(如果不设置睡眠不停的重试也可能会导致栈内存溢出) 模拟让抢到锁的线程执行业务逻辑 减少竞争
Thread.sleep(30);
// 设置锁(加锁)失败重新调用该方法进行重试
testLock();
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {

// 添加过期时间 (缺乏原子性:如果在 setnx 和 expire 之间出现异常,锁也无法释放)
// redisTemplate.expire("lock", 3, TimeUnit.SECONDS);

String number = redisTemplate.opsForValue().get("number");

if (StringUtils.isBlank(number)) {
redisTemplate.opsForValue().set("number", "1");
}

int num = Integer.parseInt(number);

redisTemplate.opsForValue().set("number", String.valueOf(++num));

if (StringUtils.equals(redisTemplate.opsForValue().get("lock"), uuid)) { // 解锁时判断是否是自己的锁
// 释放锁
redisTemplate.delete("lock");
}
}
}

使用 LUA 脚本保证删除的原子性

A 线程获取锁并且执行完业务逻辑到 最后一步释放锁的环节, 查询 Redis 判断相等 准备进入 if() 进行删除锁, 到此处 锁过期

B 线程在 A 线程锁释放后,获取到锁。刚拿到锁 A 线程对锁进行释放。导致 B 线程实际上是无锁的(本质上是判断跟删除缺乏原子性导致的)

1
2
3
4
5
6
7
// 判断 redis 中 lock 值是否跟当前 uuid 一致, 如果一致则执行 del 指令
if redis.call('get', lock) == uuid
then
return redis.call('del', lock)
else
return 0
end
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
    public void testLock() {

String uuid = UUID.randomUUID().toString();

/**
* 加锁 setIfAbsent 类似与 setNx 当 key 不存在即设置成功 否 则 失败
* 分布式锁本质就是对 key 的争抢, 谁先设置成功谁就先获取锁
*/
Boolean flag = redisTemplate.opsForValue().setIfAbsent("lock", uuid, 3, TimeUnit.SECONDS); // 解决死锁 添加过期时间在 set(获取锁) 时, 去设置过期时间;

if (!flag) {
// 加锁失败, 进行递归调用进行重试
try {
// 睡眠一段时间(如果不设置睡眠不停的重试也可能会导致栈内存溢出) 模拟让抢到锁的线程执行业务逻辑 减少竞争
Thread.sleep(30);
// 设置锁(加锁)失败重新调用该方法进行重试
testLock();
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {

// 添加过期时间 (缺乏原子性:如果在 setnx 和 expire 之间出现异常,锁也无法释放)
// redisTemplate.expire("lock", 3, TimeUnit.SECONDS);

String number = redisTemplate.opsForValue().get("number");

if (StringUtils.isBlank(number)) {
redisTemplate.opsForValue().set("number", "1");
}

int num = Integer.parseInt(number);

redisTemplate.opsForValue().set("number", String.valueOf(++num));

// 判断 redis 中 lock 值是否跟当前 uuid 一致, 如果一致则执行 del 指令
String script = "if redis.call('get', KEYS[1]) == ARGV[1] " +
"then " +
" return redis.call('del', KEYS[1])" +
"else " +
" return 0 " +
"end";

// execute 可以接受 lua 脚本, 传入的脚本字符串(脚本字符, 返回类型), key 列表, arg 列表
redisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList("lock"), uuid);

// if (StringUtils.equals(redisTemplate.opsForValue().get("lock"), uuid)) { // 解锁时判断是否是自己的锁
// // 释放锁
// redisTemplate.delete("lock");
// }
}
}

不可重入导致的死锁

不可重入导致的死锁是指一个线程已经获取了锁,在没有释放锁的情况下再次请求获取锁会导致死锁。通俗地说,一个线程在持有锁的情况下,再次去获取锁的时候会被自己给阻塞住,这样就无法继续执行,最终导致死锁。

假设线程 T1 先获取了 B 锁,然后在调用 A 方法时,由于 A 锁已经被其他线程占用了,因此 T1 会一直等待 A 锁的释放。此时,如果另外一个线程 T2 需要获取 B 锁,它会一直等待 T1 释放 B 锁,而 T1 又一直等待 A 锁的释放,这就导致了死锁。

名词 释意
可重入锁 当一个线程持有一个锁的时候,可以重复的获取该锁而不会导致死锁。
不可重入锁 不能重复获取锁,因为重复获取锁会导致死锁。
  • 假设有两个方法 A 和 B,它们都需要对一个共享资源进行访问,并且都使用了锁来保证线程安全。如果 A 方法使用的是可重入锁,那么当它需要在 B 方法中被调用时,它可以继续持有自己的锁,而不需要去获取 B 方法中的锁,这样就避免了死锁的情况。
  • 相反,如果 A 方法使用的是不可重入锁,当它需要在 B 方法中被调用时,如果 B 方法中也使用了锁,那么 A 方法就需要释放自己的锁才能获取 B 方法的锁,这样就会出现死锁的情况。

可重入加锁

参照 ReentrantLock 非公平锁加锁方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37

/**
* Performs non-fair tryLock. tryAcquire is implemented in
* subclasses, but both need nonfair try for trylock method.
*
* 执行非公平的 tryLock。 tryAcquire 是在子类中实现的,但是都需要对 trylock 方法进行非公平尝试
*/
@ReservedStackAccess
final boolean nonfairTryAcquire(int acquires) {
// 获取当前线程
final Thread current = Thread.currentThread();
// 获取 state 值, state 是当前可重入次数
int c = getState();
// 如果为 0 则表示没有人获取到锁
if (c == 0) {
// CAS 尝试获取锁, 将 state 值由 0 更新为 1
if (compareAndSetState(0, acquires)) {
// 更新成功,将当前线程记录为排他有锁线程
setExclusiveOwnerThread(current);
// 获取锁成功
return true;
}
}
// 如果不为 0,也就意味着有人获取到锁了. 进行判断当前线程是否是排他有锁线程
else if (current == getExclusiveOwnerThread()) {
// 原有值进行递增 1,表示可重入次数 + 1
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
// 记录最新的 state 值
setState(nextc);
// 获取锁成功
return true;
}
// 获取锁失败
return false;
}

我们可以通过 Redis 中的 hash 数据结构来记录,锁名称、唯一标识 uuid、可重入次数

  • 实现思路 hash Map<lockName, Map<uuid, 重入次数>>

    • 判断锁是否存在(exists), 如果不存在(0) 则直接获取锁(hset)

    • 判断是否自己的锁(hexists), 如果是(1)则重入(hincrby)

    • 否则获取锁失败, 返回 0

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// 判断锁是否存在(exists), 如果不存在(0) 则直接获取锁(hset)
if redis.call('exists', 'lock') == 0
then
redis.call('hset', 'lock', 'uuid', 1)
// 设置过期时间,防止服务器宕机导致的死锁问题
redis.call('expire', 'lock', 30)
// 表示加锁成功
return 1
// 判断是否自己的锁(hexists), 如果是(1)则重入(hincrby)
elseif redis.call ('hexists', 'lock', 'uuid') == 1
then
redis.call('hincrby', 'lock', 'uuid', 1)
// 一旦重入重入成功重新设置过期时间
redis.call('expire', 'lock', 30)
// 表示重入成功
return 1
else
// 锁存在 并且 与 当前线程的唯一标识不一致 可重入失败
return 0
end

// key: lockName
// argv: uuid, 过期时间

if redis.call('exists', KEYS[1]) == 0
then
redis.call('hset', KEYS[1], ARGV[1], 1)
redis.call('expire', KEYS[1], 30)
return 1
elseif redis.call ('hexists', KEYS[1], ARGV[1]) == 1
then
redis.call('hincrby', KEYS[1], ARGV[1], 1)
redis.call('expire', KEYS[1], ARGV[2])
return 1
else
return 0
end
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
if redis.call('exists', 'lock') == 0 or redis.call ('hexists', 'lock', 'uuid') == 1
then
redis.call('hexists', 'lock', 'uuid', 1)
redis.call('expire', 'lock', 30)
return 1
else
return 0
end

// key: lockName
// argv: uuid, 过期时间

if redis.call('exists', KEYS[1]) == 0 or redis.call('hexists', KEYS[1], ARGV[1]) == 1
then
redis.call('hincrby', KEYS[1], ARGV[1], 1)
redis.call('expire', KEYS[1], ARGV[2])
return 1
else
return 0
end

可重入解锁

参照 ReentrantLock 非公平锁解锁方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
// 当前线程是否跟排他拥有者一致, 不一致抛出异常 恶意释放锁
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 判断是否为 0
if (c == 0) {
// 释放锁成功
free = true;
// 将排他拥有者线程清空
setExclusiveOwnerThread(null);
}
// 重置 state 值
setState(c);
// 返回是否释放成功
return free;
}
  • 实现思路
    • 判断自己的锁是否存在(hexists),如果不存在(0)则返回nil
    • 如果自己的锁存在,则直接减1(hincrby -1),并判断减1后的值是否为0,为0则直接释放锁(del) 返回1
    • 直接返回 0, 表示出来一次
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// 判断自己的锁是否存在(hexists),如果不存在(0)则返回nil
if redis.call('hexists', lock, 'uuid') == 0
then
// nil 相当于 Java 中的 null, 接收到 null 值就表示在恶意释放锁 抛出异常
return nil
// 如果自己的锁存在,则直接减1(hincrby -1),并判断减1后的值是否为0,为0则直接释放锁(del) 返回1
elseif redis.call('hincrby', lock, 'uuid', -1) == 0
then
// redis.call('del', lock)
// return 1
// redis 删除成功默认返回值就是 1
return redis.call('del', lock)
else
// 直接返回 0, 表示出来一次
return 0
end

key: lockName
argv: uuid

if redis.call('hexists', KEYS[1], ARGV[1]) == 0
then
return nil
elseif redis.call('hincrby', KEYS[1], ARGV[1], -1) == 0
then
return redis.call('del', KEYS[1])
else
return 0
end

自动续期

可以 通过 Timer 去完成定时任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
    public static void main(String[] args) {
System.out.println("定时任务的初始时间 " + System.currentTimeMillis());
new Timer().schedule(new TimerTask() {
@Override
public void run() {
System.out.println("定时器定时任务: " + System.currentTimeMillis());
}
},5000, 10000);

// 取消定时器
// new Timer().cancel();

// JUC 定时任务无法续期无法进行取消
// ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3);
// System.out.println("定时任务的初始时间 " + System.currentTimeMillis());
// 初始延迟 5 秒,以后每 10 秒执行一次
// scheduledExecutorService.scheduleAtFixedRate(() -> {
// System.out.println("juc 中的定时任务: " + System.currentTimeMillis());
// }, 5, 10, TimeUnit.SECONDS);
}
  • 实现思路
    • 判断是否是自己的锁,则重置过期时间
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
if redis.call('hexists', lock, uuid) == 1 
then
// 重置成功返回 1,key 不存在设置过期时间返回 0
return redis.call('expire', lock, 30)
else
return 0

key: lockName
arg: uuid, 过期时间

if redis.call('hexists', KEYS[1], ARGV[1]) == 1
then
return redis.call('expire', KEYS[1], ARGV[2])
else
return 0

防止脑裂:RedLock 算法

如果不搭建集群可能会出现单点故障问题,搭建集群会出现如果某个服务从旧的主节点获取了锁,但是在旧的主节点挂掉后,某个从节点升级为新的主节点并不知道已经有人获取了锁,那么就会出现两个客户端同时获取了同一个锁的情况,这将导致数据不一致和并发问题。这种情况称为“脑裂”(Split Brain)问题。

Redisson

Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务。其中包括(BitSet, Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock, AtomicLong, CountDownLatch, Publish / Subscribe, Bloom filter, Remote service, Spring cache, Executor service, Live Object service, Scheduler service) Redisson提供了使用Redis的最简单和最便捷的方法。Redisson的宗旨是促进使用者对Redis的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上。