演示工程搭建
一个电商平台,该平台上有5000个商品,每个商品初始库存数量为1,总共有5000个用户在同一时刻下单购买商品,每个用户下单时会对所选商品库存数量进行一次减操作。如果最终所有商品的库存数量均为0,则表示所有商品已经售罄,如果有任何一个商品的库存数量大于0,则表示出现了并发问题(竞态条件问题(Race Condition)是指多个线程或进程在访问共享资源时,由于执行顺序不确定或者执行时间差异较大,导致最终的结果与执行顺序有关,出现了不确定性和不可预测性的情况。
竞态条件问题通常出现在并发编程中,例如多线程对同一个变量进行读写、多进程访问同一个文件等情况。为了更好地演示竞态条件问题,我们可以采用另一种方式来测试。
假设有5000个请求,我们将库存初始值设置为0,每个请求对库存进行一次加操作。如果最终库存数量为5000,则说明所有请求均已成功增加库存,没有出现并发问题。反之,如果最终库存数量小于5000,则说明出现了竞态条件问题。
并发问题解释
测试本地锁存在的问题
不加锁下的并发
可以发现并没有加到 5000值仅有 114。出现并发问题
添加本地锁的并发
添加 synchronized 本地锁, 将 number 值重置为 0 进行压测。得数是 5000 看似没有出现并发性问题,但是 本地锁只能在单个JVM内部生效,无法跨服务、跨工程、跨服务器实现协调和同步。
测试集群下本地锁存在的问题
copy 2 份实例模拟集群环境 将 number 设置为 0 重新压测 5000次。最终结果 2360 原因是极限情况下 3 台服务可能同时放入一个线程 同时到达 都将 num 转换为某一个数字 ++. 理论值在 5000 / 3 至 5000 间
分布式锁实现
基于什么实现
实现方式
考量
基于 MySQL 关系型数据库
设置主键, 把写入主键做为 lock INSERT INTO lock_table (lock_name) VALUES (‘my_lock_name’) ON DUPLICATE KEY UPDATE lock_name = lock_name; DELETE FROM lock_table WHERE lock_name = ‘my_lock_name’;
MySQL 实现方式最简单 > Redis > Zk
基于 Redis 非关系型数据库
加锁 setnx, 解锁 del, 重试
Redis 性能最高 偏向 AP > Zk 偏向 CP > MySQL
基于 Zookeeper 实现
略
Zk 追求 CP 可靠性 > Redis & MySQL
基于 Redis 实现分布式锁
分布式锁加锁解锁添加 else 的必要性
执行方法本质是一个入栈出栈的执行过程, testLock() 第一次执行压栈。未获取到锁进入 if() 等待睡眠时间结束后进行重试,继续压栈。如果重试后还是没有获取到锁,继续进入 if() 等待睡眠结束然后压栈。直到获取锁成功执行完后依次出栈。
本质一个请求我们只希望它对 number 进行 + 1,在不加 else 的情况下,每次重试都会使程序重新调用 testLock() 方法,从而重新执行对资源 number 的 + 操作。这样就会导致一个请求重试多少次,就会让我们多 + 多少次,增加了资源的不必要修改和开销。
因此,在加了 else 的情况下,只有成功获取到锁时才进行对资源的修改,避免了重复操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 public void testLock () { Boolean flag = redisTemplate.opsForValue().setIfAbsent("lock" , "lock" ); if (!flag) { try { Thread.sleep(30 ); testLock(); } catch (InterruptedException e) { e.printStackTrace(); } } else { String number = redisTemplate.opsForValue().get("number" ); if (StringUtils.isBlank(number)) { redisTemplate.opsForValue().set("number" , "1" ); } int num = Integer.parseInt(number); redisTemplate.opsForValue().set("number" , String.valueOf(++num)); redisTemplate.delete("lock" ); } }
添加过期时间防止死锁
问题: 一个线程获取到锁 还没有执行到释放锁操作 服务器宕机. 其他线程获取不到锁 即使 服务器重启 这把锁也无法被释放掉. 其他线程一直执行递归操作 最终导致服务器资源耗尽而宕机
解决: 在获取锁的同时设置过期时间
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 public void testLock () { Boolean flag = redisTemplate.opsForValue().setIfAbsent("lock" , "lock" , 3 , TimeUnit.SECONDS); if (!flag) { try { Thread.sleep(30 ); testLock(); } catch (InterruptedException e) { e.printStackTrace(); } } else { String number = redisTemplate.opsForValue().get("number" ); if (StringUtils.isBlank(number)) { redisTemplate.opsForValue().set("number" , "1" ); } int num = Integer.parseInt(number); redisTemplate.opsForValue().set("number" , String.valueOf(++num)); redisTemplate.delete("lock" ); } }
添加 UUID 防误删
问题: A、B、C 三个服务都在尝试获取同一个锁,并且这个锁没有设置续期过期时间。
如果业务逻辑的执行时间是7s, A 服务获取锁 业务没有执行完 锁3秒被自动释放, B 服务获取到锁 业务没有执行完 锁3秒被自动释放, C 服务获取锁执行业务逻辑.A 服务业务执行完成 释放锁, 这时释放的是 C 的锁. 导致 C 业务只执行了 1s 就被别人释放. 最终等于没有锁(可能会释放其他服务器的锁)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 public void testLock () { String uuid = UUID.randomUUID().toString(); Boolean flag = redisTemplate.opsForValue().setIfAbsent("lock" , uuid, 3 , TimeUnit.SECONDS); if (!flag) { try { Thread.sleep(30 ); testLock(); } catch (InterruptedException e) { e.printStackTrace(); } } else { String number = redisTemplate.opsForValue().get("number" ); if (StringUtils.isBlank(number)) { redisTemplate.opsForValue().set("number" , "1" ); } int num = Integer.parseInt(number); redisTemplate.opsForValue().set("number" , String.valueOf(++num)); if (StringUtils.equals(redisTemplate.opsForValue().get("lock" ), uuid)) { redisTemplate.delete("lock" ); } } }
使用 LUA 脚本保证删除的原子性
A 线程获取锁并且执行完业务逻辑到 最后一步释放锁的环节, 查询 Redis 判断相等 准备进入 if() 进行删除锁, 到此处 锁过期
B 线程在 A 线程锁释放后,获取到锁。刚拿到锁 A 线程对锁进行释放。导致 B 线程实际上是无锁的(本质上是判断跟删除缺乏原子性导致的)
1 2 3 4 5 6 7 // 判断 redis 中 lock 值是否跟当前 uuid 一致, 如果一致则执行 del 指令 if redis.call('get' , lock) == uuid then return redis.call('del' , lock) else return 0 end
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 public void testLock () { String uuid = UUID.randomUUID().toString(); Boolean flag = redisTemplate.opsForValue().setIfAbsent("lock" , uuid, 3 , TimeUnit.SECONDS); if (!flag) { try { Thread.sleep(30 ); testLock(); } catch (InterruptedException e) { e.printStackTrace(); } } else { String number = redisTemplate.opsForValue().get("number" ); if (StringUtils.isBlank(number)) { redisTemplate.opsForValue().set("number" , "1" ); } int num = Integer.parseInt(number); redisTemplate.opsForValue().set("number" , String.valueOf(++num)); String script = "if redis.call('get', KEYS[1]) == ARGV[1] " + "then " + " return redis.call('del', KEYS[1])" + "else " + " return 0 " + "end" ; redisTemplate.execute(new DefaultRedisScript <>(script, Boolean.class), Arrays.asList("lock" ), uuid); } }
不可重入导致的死锁
不可重入导致的死锁是指一个线程已经获取了锁,在没有释放锁的情况下再次请求获取锁会导致死锁。通俗地说,一个线程在持有锁的情况下,再次去获取锁的时候会被自己给阻塞住,这样就无法继续执行,最终导致死锁。
假设线程 T1 先获取了 B 锁,然后在调用 A 方法时,由于 A 锁已经被其他线程占用了,因此 T1 会一直等待 A 锁的释放。此时,如果另外一个线程 T2 需要获取 B 锁,它会一直等待 T1 释放 B 锁,而 T1 又一直等待 A 锁的释放,这就导致了死锁。
名词
释意
可重入锁
当一个线程持有一个锁的时候,可以重复的获取该锁而不会导致死锁。
不可重入锁
不能重复获取锁,因为重复获取锁会导致死锁。
假设有两个方法 A 和 B,它们都需要对一个共享资源进行访问,并且都使用了锁来保证线程安全。如果 A 方法使用的是可重入锁,那么当它需要在 B 方法中被调用时,它可以继续持有自己的锁,而不需要去获取 B 方法中的锁,这样就避免了死锁的情况。
相反,如果 A 方法使用的是不可重入锁,当它需要在 B 方法中被调用时,如果 B 方法中也使用了锁,那么 A 方法就需要释放自己的锁才能获取 B 方法的锁,这样就会出现死锁的情况。
可重入加锁
参照 ReentrantLock 非公平锁加锁方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 @ReservedStackAccess final boolean nonfairTryAcquire (int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0 ) { if (compareAndSetState(0 , acquires)) { setExclusiveOwnerThread(current); return true ; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0 ) throw new Error ("Maximum lock count exceeded" ); setState(nextc); return true ; } return false ; }
我们可以通过 Redis 中的 hash 数据结构来记录,锁名称、唯一标识 uuid、可重入次数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 // 判断锁是否存在(exists), 如果不存在(0 ) 则直接获取锁(hset) if redis.call('exists' , 'lock' ) == 0 then redis.call('hset' , 'lock' , 'uuid' , 1 ) // 设置过期时间,防止服务器宕机导致的死锁问题 redis.call('expire' , 'lock' , 30 ) // 表示加锁成功 return 1 // 判断是否自己的锁(hexists), 如果是(1 )则重入(hincrby) elseif redis.call ('hexists' , 'lock' , 'uuid' ) == 1 then redis.call('hincrby' , 'lock' , 'uuid' , 1 ) // 一旦重入重入成功重新设置过期时间 redis.call('expire' , 'lock' , 30 ) // 表示重入成功 return 1 else // 锁存在 并且 与 当前线程的唯一标识不一致 可重入失败 return 0 end // key: lockName // argv: uuid, 过期时间 if redis.call('exists' , KEYS[1 ]) == 0 then redis.call('hset' , KEYS[1 ], ARGV[1 ], 1 ) redis.call('expire' , KEYS[1 ], 30 ) return 1 elseif redis.call ('hexists' , KEYS[1 ], ARGV[1 ]) == 1 then redis.call('hincrby' , KEYS[1 ], ARGV[1 ], 1 ) redis.call('expire' , KEYS[1 ], ARGV[2 ]) return 1 else return 0 end
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 if redis.call('exists' , 'lock' ) == 0 or redis.call ('hexists' , 'lock' , 'uuid' ) == 1 then redis.call('hexists' , 'lock' , 'uuid' , 1 ) redis.call('expire' , 'lock' , 30 ) return 1 else return 0 end // key: lockName // argv: uuid, 过期时间 if redis.call('exists' , KEYS[1 ]) == 0 or redis.call('hexists' , KEYS[1 ], ARGV[1 ]) == 1 then redis.call('hincrby' , KEYS[1 ], ARGV[1 ], 1 ) redis.call('expire' , KEYS[1 ], ARGV[2 ]) return 1 else return 0 end
可重入解锁
参照 ReentrantLock 非公平锁解锁方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 protected final boolean tryRelease (int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException (); boolean free = false ; if (c == 0 ) { free = true ; setExclusiveOwnerThread(null ); } setState(c); return free; }
实现思路
判断自己的锁是否存在(hexists),如果不存在(0)则返回nil
如果自己的锁存在,则直接减1(hincrby -1),并判断减1后的值是否为0,为0则直接释放锁(del) 返回1
直接返回 0, 表示出来一次
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 // 判断自己的锁是否存在(hexists),如果不存在(0 )则返回nil if redis.call('hexists' , lock, 'uuid' ) == 0 then // nil 相当于 Java 中的 null, 接收到 null 值就表示在恶意释放锁 抛出异常 return nil // 如果自己的锁存在,则直接减1 (hincrby -1 ),并判断减1 后的值是否为0 ,为0 则直接释放锁(del) 返回1 elseif redis.call('hincrby' , lock, 'uuid' , -1 ) == 0 then // redis.call('del' , lock) // return 1 // redis 删除成功默认返回值就是 1 return redis.call('del' , lock) else // 直接返回 0 , 表示出来一次 return 0 end key: lockName argv: uuid if redis.call('hexists' , KEYS[1 ], ARGV[1 ]) == 0 then return nil elseif redis.call('hincrby' , KEYS[1 ], ARGV[1 ], -1 ) == 0 then return redis.call('del' , KEYS[1 ]) else return 0 end
自动续期
可以 通过 Timer 去完成定时任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public static void main (String[] args) { System.out.println("定时任务的初始时间 " + System.currentTimeMillis()); new Timer ().schedule(new TimerTask () { @Override public void run () { System.out.println("定时器定时任务: " + System.currentTimeMillis()); } },5000 , 10000 ); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 if redis.call('hexists' , lock, uuid) == 1 then // 重置成功返回 1 ,key 不存在设置过期时间返回 0 return redis.call('expire' , lock, 30 ) else return 0 key: lockName arg : uuid, 过期时间if redis.call('hexists' , KEYS[1 ], ARGV[1 ]) == 1 then return redis.call('expire' , KEYS[1 ], ARGV[2 ]) else return 0
防止脑裂:RedLock 算法
如果不搭建集群可能会出现单点故障问题,搭建集群会出现如果某个服务从旧的主节点获取了锁,但是在旧的主节点挂掉后,某个从节点升级为新的主节点并不知道已经有人获取了锁,那么就会出现两个客户端同时获取了同一个锁的情况,这将导致数据不一致和并发问题。这种情况称为“脑裂”(Split Brain)问题。
Redisson
Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务。其中包括(BitSet
, Set
, Multimap
, SortedSet
, Map
, List
, Queue
, BlockingQueue
, Deque
, BlockingDeque
, Semaphore
, Lock
, AtomicLong
, CountDownLatch
, Publish / Subscribe
, Bloom filter
, Remote service
, Spring cache
, Executor service
, Live Object service
, Scheduler service
) Redisson提供了使用Redis的最简单和最便捷的方法。Redisson的宗旨是促进使用者对Redis的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上。