前言
这应该是分布式锁演化的最后一个章节了,相信很多小伙伴们看完这个章节之后在应对高并发的情况下,如何保证线程安全心里肯定也会有谱了。在实际的项目中也可以参考一下老猫的github上的例子,当然代码没有经过特意的封装,需要小伙伴们自己再好好封装一下。那么接下来,就和大家分享一下基于zookeeper的分布式锁,由于此篇主要分享的是zk的分布式锁,所以对于zk本身的相关知识点,并不会涉及很多。和分布式锁实现有关的zk知识点会提及。
Zookeeper实现分布式锁
何为ZK?(为了打字简单,后续老猫均以ZK来代替zookeeper),相信很多接触到Dubbo框架的小伙伴可能听说过ZK,但是具体也没有详细地去学习ZK。那么又如何利用ZK来实现分布式锁呢?以下我们一个个来看。
什么是ZK?
对于没有接触过ZK的小伙伴,老猫给个非专业但是挺实用的解释,ZK是一个分布式协调服务,该服务由N多个节点构成,每个节点均可存储数据。
数据结构
在了解锁原理之前我们先来看一下ZK的数据结构,具体如下:
在 Zookeeper 中,每一个数据节点都是一个 ZNode,上图根目录下有两个节点,分别是:app1 和 app2,其中 app1 下面又有三个子节点。那么我们来看看 ZNode 数据结构到底是什么样子的呢。首先我们来了解 ZNode 的类型。
Zookeeper 节点类型可以分为三大类:持久性节点(Persistent)、瞬时性节点(Ephemeral)、顺序性节点(Sequential)。现实开发中在创建节点的时候通过组合可以生成以下四种节点类型:持久节点、持久顺序节点、瞬时节点、瞬时有序节点。
(1) 持久节点:节点被创建后会一直存在服务器,直到删除操作主动清除,这种节点也是最常见的类型。
(2) 持久顺序节点:有顺序的持久节点,节点特性和持久节点是一样的,只是额外特性表现在顺序上。顺序特性实质是在创建节点的时候,会在节点名后面加上一个数字后缀,来表示其顺序。
(3) 瞬时节点:会被自动清理掉的节点,它的生命周期和客户端会话绑在一起,客户端会话结束,节点会被删除掉。与持久性节点不同的是,临时节点不能创建子节点。
(4)瞬时有顺序节点:有顺序的临时节点,和持久顺序节点相同,在其创建的时候会在名字后面加上数字后缀。
那么此次我们的ZK分布式锁就是基于ZK的临时有序节点实现的,也就是上述的第四种节点。当然光凭借第四种临时有序节点是不够的,我们还需要用到ZK的另外一个比较重要的概念,那就是“ZK观察器”。
ZK观察器
ZK观察器可以监测到节点的变动,如果节点发生变更会通知到客户端。我们可以设置观察器的三个方法:getData(),getChildrean(),exists()。观察器有一个比较重要的特性就是只能监控一次,再监控需要重新设置。
原理流程
(1)利用ZK的瞬时有序节点的特性。
(2)多线程并发创建瞬时节点时,得到有序的序列。
(3)序号最小的线程获得锁。
(4)其他的线程则监听自己节点序号的前一个序号。
(5)前一个线程执行完成,删除自己序号的节点。
(6)下一个序号的线程得到通知,继续执行。
(7)依次类推
通过上述流程大家就会发现,其实在创建节点的时候,就已经确定了线程的执行顺序。大家看完这个流程可能有点模糊,咱们继续看下面的图解,老猫相信大家心里就会有一个更加清晰的认知。
【流程一】我们有四个线程,分别是线程A、线程B、线程C、线程D。此时线程并发运行,这样就会在我们的ZK中创建四个临时有序节点,按照先来后到的顺序分别是1、2、3、4。此时按照我们流程描述中的第三点描述由于线程A对应的序号最小,所以A优先获取锁。
【流程二】再依次看第二个流程,此时当A获取锁之后,线程B的监听器会去监听1节点的执行情况,线程C的监听器会去监听2节点的执行情况,线程D的监听器会去监听3节点的执行情况依次类推。
【流程三】当线程A执行完毕之后会删除相关的节点1,此时会被线程B监听到,于是线程B开始执行,有线程C监听等待着线程B节点的释放,依次类推,直到这四个线程都执行完毕。
通过以上的图解,老猫觉得很多小伙伴对ZK锁的实现原理应该已经知道了,当然对ZK还是比较陌生的小伙伴也可以专门抽时间去熟悉一下ZK。接下来就和老猫一起来看一下具体的代码又是如何实现的吧。
纯手撸ZK分布式锁代码
基于上述的流程,我们手撸一下核心的代码,首先我们搭建的zk服务器必须和项目中使用的pom依赖是同一版本,这样也才能够避免出问题,由于老猫使用的是zk的3.6.2版本,所以老猫引入的pom如下:
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.6.2</version>
</dependency>
手写zk锁的逻辑主要也是根据上述原理实现,代码中有比较晦涩难懂的地方,老猫也写了详细的备注,还有不 明白的铁子可以给老猫留言:
/**
* @author kdaddy@163.com
* @date 2021/1/16 10:25
* @公众号 程序员老猫
*/
@Slf4j
@Service
public class ZKLockUtil implements AutoCloseable, Watcher {
private ZooKeeper zooKeeper;
private String zNode;
public ZKLockUtil() throws Exception {
this.zooKeeper = new ZooKeeper("localhost:2181",100000,this);
}
public boolean getLock(String businessCode){
try {
// 首先创建业务根节点,类比之前的redis锁的key以及mysql锁的businessCode
Stat stat = zooKeeper.exists("/"+businessCode,false);
if(stat == null){
//表示创建一个业务根目录,此节点为持久节点,另外的由于在本地搭建的zk没有设置密码,所以采用OPEN_ACL_UNSAFE模式
zooKeeper.create("/" +businessCode,businessCode.getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
}
//创建该目录下的有序瞬时节点,假如我们的订单业务编号是"order",那么第一个有序瞬时节点应该是/order/order_0000001
zNode =zooKeeper.create("/" + businessCode + "/" + businessCode + "_", businessCode.getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
/**
* 按照之前原理的时候的逻辑,
* 我们会对所有的节点进行排序并且序号最小的那个节点优先获取锁,
* 其他节点处于监听状态
*/
//此处获取所有子节点,注:之前文章中提及的getData(),getChildrean(),exists()的第二个参数表示是否设置观察器,ture为设置,false表示不设置
List<String> childrenNodes = zooKeeper.getChildren("/"+businessCode,false);
//子节点排序
Collections.sort(childrenNodes);
//获取序号最小的子节点
String minNode = childrenNodes.get(0);
//如果创建的节点是最小序号的节点,那么就获得锁
if(zNode.endsWith(minNode)){
return true;
}
//否则监听前一个节点的情况
/**
* 到这里说明创建的zNode为第二个或者第三第四个等节点
* 此处比较晦涩用代入法去理解
* 如果zNode是第二个节点,那么监听的就是第一个最小节点,
* 如果zNode是第三个节点,那么此时上一个节点就是循环中的当前那个节点。
* 需要细品
*/
String lastNode = minNode;
for (String node : childrenNodes){
//如果瞬时节点为非第一个节点,那么监听前一个节点
if(zNode.endsWith(node)){
zooKeeper.exists("/"+businessCode+"/"+lastNode,true);
break;
}else {
lastNode = node;
}
}
//并发情况下wait方法让出锁,但是由于并发情景下,为了避免释放的时候错乱因此加上synchronized
synchronized (this){
wait();
}
//当被唤起的时候相当于轮到了,当前拿到了锁,所以return true
return true;
}catch (Exception e){
e.printStackTrace();
}
return false;
}
@Override
public void process(WatchedEvent watchedEvent) {
//如果监听到节点被删除,那么则会通知下一个线程
if(watchedEvent.getType() == Event.EventType.NodeDeleted){
synchronized (this){
notify();
}
}
}
@Override
public void close() throws Exception {
zooKeeper.delete(zNode,-1);
zooKeeper.close();
log.info("我已经释放了锁!");
}
}
具体service层的代码老猫也做了更改,由于只看锁,所以在此老猫将相关落订单的逻辑去除了,对于上述工具类,可以进行如下使用:
/**
* @author kdaddy@163.com
* @date 2021/1/16 10:25
* @公众号 程序员老猫
*/
@Service
@Slf4j
public class ZKLockService {
@Autowired
private ZKLockUtil zkLockUtil;
private String ORDER_KEY = "order_kd";
public Integer createOrder() throws Exception{
log.info("进入了方法");
try {
if (zkLockUtil.getLock(ORDER_KEY)) {
log.info("拿到了锁");
//此处为了手动演示并发,所以我们暂时在这里休眠
Thread.sleep(6000);
}
}catch (Exception e){
e.printStackTrace();
}finally {
zkLockUtil.close();
}
log.info("方法执行完毕");
return 1;
}
}
上述即为实现代码,相关的逻辑,老猫也在代码的备注中阐释。如果还有不清楚的小伙伴可以给老猫留言。当然想要完整测试代码也可以去老猫的github地址下载。地址:https://github.com/maoba/kd-distribute
curator客户端的使用
相信有很多还是会有很多小伙伴会说,上述的流程逻辑比较绕,太让人头疼了。那么福利来了,其实关于ZK锁的话还有可以用封装比较完善的客户端,那就是curator。这个客户端本身就已经实现了ZK的分布式锁,咱们开箱调用即可。如果有更多的小伙伴想要了解curator,也可以去官网去研究一番,具体的地址为:http://curator.apache.org/。当然老猫下面的代码也是根据官网的步骤写出来的。具体代码实现如下:
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.3.0</version>
</dependency>
由于curator每次启动都要连接zk,所以老猫干脆将其放在springboot的启动中。其实上面手写的通过构造方法连接zk的方式也可以做一下改造。
/**
* @author ktdaddy
* @公众号 程序员老猫
*/
@SpringBootApplication
@MapperScan("com.kd.distribute.dao")
public class DistributeApplication {
public static void main(String[] args) {
SpringApplication.run(DistributeApplication.class, args);
}
//启动服务的时候连接zk,并且指定开始使用和结束使用的方法
@Bean(initMethod="start",destroyMethod = "close")
public CuratorFramework getCuratorFramework() {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", retryPolicy);
return client;
}
}
具体锁的使用代码如下:
/**
* @author kdaddy@163.com
* @date 2021/1/16 22:49
* @公众号 程序员老猫
*/
@Service
@Slf4j
public class CuratorLockService {
private String ORDER_KEY = "order_kd";
@Autowired
private CuratorFramework client;
public Integer createOrder() throws Exception{
log.info("进入了方法");
InterProcessMutex lock = new InterProcessMutex(client, "/"+ORDER_KEY);
try {
if (lock.acquire(30, TimeUnit.SECONDS)) {
log.info("拿到了锁");
//此处为了手动演示并发,所以我们暂时在这里休眠6s
Thread.sleep(6000);
}
}catch (Exception e){
e.printStackTrace();
}finally {
try {
log.info("我释放了锁!!");
lock.release();
} catch (Exception e) {
e.printStackTrace();
}
}
log.info("方法执行完毕");
return 1;
}
}
相当简单,当然有兴趣研究源码实现的小伙伴也可以查看一下InterProcessMutex的相关的源码。在此老猫不赘述。
分布式锁的对比
到此,我们将分布式系统的锁的解决方案都已经和大家分享过了,最终咱们来进行一个对比,具体如下:
看了上面这个比较之后,其实在我们的实际项目中,还是推荐现成的 curator 实现方式以及redisson实现方式,因为毕竟目前来说是相当成熟的方案,不推荐由我们自己的代码去实现。所以小伙伴们在选择的时候就不用纠结了。
写在最后
老猫花了将近半个月的时候整理和输出了单体锁演化到分布式锁的解决方案,熬了比较多的夜,如果能给大家带来收获,那是再好不过的了。当然看到这里也希望能得到你的点赞、关注和转发。你的支持,是老猫原创的最大动力,后面老猫会带给大家更多分布式系统的解决方案。也希望能得到你的持续关注。更多精彩欢迎大家关注公众号“程序员老猫”