原文链接:https://www.changxuan.top/?p=1230
在单体架构向分布式集群架构演进的过程中,项目中必不可少的一个功能组件就是分布式锁。在开发团队有技术积累的情况下,做为团队的一个“工具人”往往有限的时间都投入到了业(C)务(U)开(R)发(D)上,并不会去深究工具类中的分布式锁到底是如何实现的。大家只需要清楚如何使用某个同事写好的 Redis 工具类就可以了。所以,今天就带大家从零开始实现一个基于Redis的可以在项目中直接使用的分布式锁。
首先,需要搞清楚一个问题,我们为什么需要分布式锁或者说为什么需要锁?下面我们通过一张图来说明这个问题,
在上面这张图中,同时有两个线程 Thread A 和 Thread B 要做同一件事,你可以理解它们为要同时执行一个代码块。但是,执行这个代码块有个特殊的要求:不能有多个线程同时执行,不然系统数据就会出乱子!所有需要执行这段代码的线程,需要挨个排队来执行。所以,此时就需要有一个所有线程都能访问到的一个变量,根据这个变量的状态来判断此时是否有其它线程正在执行,来决定当前线程是否能够执行,那么这个变量就是“锁”。假如设变量为 static volitate int lock = 0;
,当 lock
的值为 0 时,表明此时没有线程在执行这段有特殊要求的代码,当 lock
的值为 1 时,表明此时有其它线程在执行这段有特殊要求的代码。当某个线程获取到 lock
值为 0,且将 lock
值改为 1 的过程,称为成功获取锁(注:此过程需要是原子性的);当该线程执行完这段代码后,将 lock
的值改为 0 的操作称为释放锁。
在分布式系统中,由于子系统需要支持水平扩展所以就不能把内存变量的状态做为“一把锁”了。不过我们可以把变量放到 Redis 中,这样所有节点的线程都能够访问和操作了。
一、 一把简单的“锁”
“一口吃不成个大胖子”,我们先实现一个最简单 Redis 锁。我们通过逐渐发现问题并解决的过程,来加深理解。
根据前文所描述的锁的基本原理,我首先写了两个方法,一个是获取锁 boolean lock(String key)
,一个是释放锁 void unlock(String key)
。
@RedisUtil.java
/**
* 加锁
* @param key 锁名称
* @return lock 是否获取锁, true:获取, false:未获取
*/
public boolean lock(String key) {
boolean lock;
try {
// setIfAbsent 等价于 Redis 的 setnx 命令,具有原子性
lock = Boolean.TRUE.equals(redisTemplate.opsForValue().setIfAbsent(key, "LOCK"));
} catch (Exception e) {
log.error("获取锁:{},出现异常{}", key, e);
return false;
}
return lock;
}
/**
* 释放锁
* @param key 锁名称
*/
public void unlock(String key) {
if (Boolean.TRUE.equals(redisTemplate.hasKey(key))) {
redisTemplate.delete(key);
}
}
下面我们就写代码来测试一下,
测试思路:
创建十个线程调用 work() 方法
加锁:方法获取到锁的线程对变量 count 执行1000次自增操作,未获取到锁的线程则不执行自增操作。
不加锁:由于自增操作是非原子性的,所以最终 count 的结果会小于 10000 大于 1000 。
@WorkService.java
public static int count = 0;
private static final int TIME = 1000;
public void work() {
String key = "TEST_KEY";
if (redisUtil.lock(key)) {
log.info("线程:{},已经获取锁", Thread.currentThread().getName());
try {
for (int i = 0; i < TIME; i++) {
WorkService.count++;
}
}catch (Exception e) {
log.error("发生错误",e);
}finally {
redisUtil.unlock(key);
log.info("线程:{},已经释放锁", Thread.currentThread().getName());
}
} else {
log.info("线程:{},未获取到锁", Thread.currentThread().getName());
}
}
public void notLockWork() {
for (int i = 0; i < TIME; i++) {
WorkService.count++;
}
}
@WorkServiceTest.java
@Test
void work() throws Exception {
CountDownLatch downLatch = new CountDownLatch(10);
LinkedList<Thread> threads = new LinkedList<>();
for (int i = 0; i < 10; ++i) {
Thread thread = new Thread(() -> {
// 加锁测试
workService.work();
// 未加锁测试
// workService.notLockWork()
downLatch.countDown();
});
threads.add(thread);
}
for (Thread thread : threads) {
thread.start();
}
downLatch.await();
System.out.println("count = " + WorkService.count);
}
加锁测试结果(控制台输出)
2021-01-17 16:52:35.256 INFO 8648 --- [Thread-150] com.cxcoder.services.WorkService : 线程:Thread-150,已经获取锁
2021-01-17 16:52:35.277 INFO 8648 --- [Thread-152] com.cxcoder.services.WorkService : 线程:Thread-152,未获取到锁
2021-01-17 16:52:35.277 INFO 8648 --- [Thread-149] com.cxcoder.services.WorkService : 线程:Thread-149,未获取到锁
2021-01-17 16:52:35.277 INFO 8648 --- [Thread-154] com.cxcoder.services.WorkService : 线程:Thread-154,未获取到锁
2021-01-17 16:52:35.277 INFO 8648 --- [Thread-148] com.cxcoder.services.WorkService : 线程:Thread-148,未获取到锁
2021-01-17 16:52:35.277 INFO 8648 --- [Thread-153] com.cxcoder.services.WorkService : 线程:Thread-153,未获取到锁
2021-01-17 16:52:35.277 INFO 8648 --- [Thread-151] com.cxcoder.services.WorkService : 线程:Thread-151,未获取到锁
2021-01-17 16:52:35.277 INFO 8648 --- [Thread-146] com.cxcoder.services.WorkService : 线程:Thread-146,未获取到锁
2021-01-17 16:52:35.277 INFO 8648 --- [Thread-147] com.cxcoder.services.WorkService : 线程:Thread-147,未获取到锁
2021-01-17 16:52:35.277 INFO 8648 --- [Thread-155] com.cxcoder.services.WorkService : 线程:Thread-155,未获取到锁
2021-01-17 16:52:35.320 INFO 8648 --- [Thread-150] com.cxcoder.services.WorkService : 线程:Thread-150,已经释放锁
count = 1000
未加锁测试结果(控制台输出)
count = 3287
从控制台输出结果来看,目前的锁已经可以用来限制某块代码在某一时刻只能有一个线程在执行。但是也能够发现,该锁的实现机制还存在一些问题和不能满足的需求。
例如,如果在项目里中出现”恶意代码“或者不规范代码的情况下则会出现预料之外的结果。看下面的例子,
@WorkService.java
public void work(){
String key = "TEST_KEY";
try {
if (redisUtil.lock(key)){
log.info("线程:{},获取锁", Thread.currentThread().getName());
... ... //(A)
}
}catch (Exception e) {
log.error("发生错误",e);
}finally {
redisUtil.unlock(key);
log.info("线程:{},释放锁", Thread.currentThread().getName());
}
}
如果 void work()
是上面的这种写法,会出现什么问题呢?当有线程X
获取到锁后,正在执行 A 处的代码时。这时 线程B
来到后获取锁失败,却执行了 finally
里的代码将锁给释放了。此时 线程A
还在执行的过程中,又来了 线程C
获取到锁后也开始执行。所以,这个 Redis 锁的实现机制存在一个比较严重的问题是某个线程所持有的锁可以被其它线程随意给释放掉。另外,从控制台输出的结果中可以看出在某个线程持有锁的时间段内,其它线程是未被阻塞的。目前这个锁应该被称为存在问题的基于Redis的非阻塞的分布式锁。