• 欢迎访问开心洋葱网站,在线教程,推荐使用最新版火狐浏览器和Chrome浏览器访问本网站,欢迎加入开心洋葱 QQ群
  • 为方便开心洋葱网用户,开心洋葱官网已经开启复制功能!
  • 欢迎访问开心洋葱网站,手机也能访问哦~欢迎加入开心洋葱多维思维学习平台 QQ群
  • 如果您觉得本站非常有看点,那么赶紧使用Ctrl+D 收藏开心洋葱吧~~~~~~~~~~~~~!
  • 由于近期流量激增,小站的ECS没能经的起亲们的访问,本站依然没有盈利,如果各位看如果觉着文字不错,还请看官给小站打个赏~~~~~~~~~~~~~!

一文带你学会AQS和并发工具类的关系

JAVA相关 雪中孤狼 1882次浏览 0个评论

1. 存在的意义

  AQS(AbstractQueuedSynchronizer)是JAVA中众多锁以及并发工具的基础,其底层采用乐观锁,大量使用了CAS操作, 并且在冲突时,采用自旋方式重试,以实现轻量级和高效地获取锁。

  提供一个框架,用于实现依赖于先进先出(FIFO)等待队列的阻塞锁和相关的同步器(semaphore等)。 此类旨在为大多数依赖单个原子int值表示状态的同步器提供有用的基础。 子类必须定义更改此状态的受保护方法,并定义该状态对于获取或释放此对象而言意味着什么。 鉴于这些,此类中的其他方法将执行所有排队和阻塞机制。 子类可以维护其他状态字段,但是相对于同步,仅跟踪使用方法getState,setState和compareAndSetState操作的原子更新的int值。

  此类支持默认独占模式和共享模式之一或两者。 当以独占方式进行获取时,其他线程尝试进行的获取将无法成功。 由多个线程获取的共享模式可能(但不一定)成功。 该类不“理解”这些差异,当共享模式获取成功时,下一个等待线程(如果存在)还必须确定它是否也可以获取。 在不同模式下等待的线程共享相同的FIFO队列。 通常,实现子类仅支持这些模式之一,但例如可以在ReadWriteLock发挥作用。 仅支持独占模式或仅支持共享模式的子类无需定义支持未使用模式的方法。

2. 核心知识点

2.1 state

private volatile int state; // 同步状态

  state是整个工具的核心,通常整个工具都是在设置和修改状态,很多方法的操作都依赖于当前状态是什么。由于状态是全局共享的,一般会被设置成volatile类型,为了保证其修改的可见性;

2.2 CLH队列

  AQS中的队列是CLH变体的虚拟双向队列(FIFO),AQS是通过将每条请求共享资源的线程封装成一个节点来实现锁的分配。队列采用的是悲观锁的思想,表示当前所等待的资源,状态或者条件短时间内可能无法满足。因此,它会将当前线程包装成某种类型的数据结构,扔到一个等待队列中,当一定条件满足后,再从等待队列中取出。

2.3 CAS操作

  CAS操作是最轻量的并发处理,通常我们对于状态的修改都会用到CAS操作,因为状态可能被多个线程同时修改,CAS操作保证了同一个时刻,只有一个线程能修改成功,从而保证了线程安全。CAS采用的是乐观锁的思想,因此常常伴随着自旋,如果发现当前无法成功地执行CAS,则不断重试,直到成功为止。

3. 核心实现原理

3.1 作为同步器的基础

  要将此类用作同步器的基础,请使用getState,setState或compareAndSetState检查或修改同步状态,以重新定义以下方法(如适用):

  1. tryAcquire

独占方式,arg为获取锁的次数,尝试获取资源,成功则返回True,失败则返回False。

  1. tryRelease

独占方式,arg为释放锁的次数,尝试释放资源,成功则返回True,失败则返回False。

  1. tryAcquireShared

共享方式,arg为获取锁的次数,尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。

  1. tryReleaseShared

共享方式,arg为释放锁的次数,尝试释放资源,如果释放后允许唤醒后续等待结点返回True,否则返回False。

  1. isHeldExclusively

该线程是否正在独占资源。只有用到Condition才需要去实现它。

  默认情况下,这些方法中的每一个都会引发UnsupportedOperationException 。 这些方法的实现必须在内部是线程安全的,并且通常应简短且不阻塞。 定义这些方法是使用此类的唯一受支持的方法。 所有其他方法都被声明为final方法,因为它们不能独立变化。

3.2 同步状态state

  AQS中维护了一个名为state的字段,意为同步状态,是由volatile修饰的,用于展示当前临界资源的获锁情况。

private volatile int state;

下面提供了几个访问这个state字段的方法:
返回同步状态的当前值。 此操作具有volatile读取的内存语义

protected final int getState() {
    return state;
}

设置同步状态的值。 此操作具有volatile写操作的内存语义。

protected final void setState(int newState) {
    state = newState;
}

  如果当前状态值等于期望值,则以原子方式将同步状态设置为给定的更新值。 此操作具有volatile读写的内存语义

protected final boolean compareAndSetState(int expect, int update) {
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

  这几个方法都是Final修饰的,说明子类中无法重写它们。我们可以通过修改State字段表示的同步状态来实现多线程的独占模式和共享模式
state的值即表示了锁的状态,state为0表示锁没有被占用,state大于0表示当前已经有线程持有该锁,这里之所以说大于0而不说等于1是因为可能存在可重入的情况。你可以把state变量当做是当前持有该锁的线程数量。

public abstract class AbstractOwnableSynchronizer
    protected AbstractOwnableSynchronizer() {
      
    private transient Thread exclusiveOwnerThread;
    protected final void setExclusiveOwnerThread(Thread thread) {
        exclusiveOwnerThread = thread;
    }
    protected final Thread getExclusiveOwnerThread() {
        return exclusiveOwnerThread;
    }
}

exclusiveOwnerThread 属性的值即为当前持有锁的线程独占模式获取锁流程:

共享模式获取锁流程:

3.3 数据结构

  1. AQS中最基本的数据结构是Node,Node即为CLH变体队列中的节点。
static final class Node {
    // 表示线程以共享的模式等待锁
    static final Node SHARED = new Node();
    // 表示线程正在以独占的方式等待锁
    static final Node EXCLUSIVE = null;
    // 为1,表示线程获取锁的请求已经取消了
    static final int CANCELLED =  1;
    // 为-1,表示线程已经准备好了,就等资源释放了
    static final int SIGNAL    = -1;
    // 为-2,表示节点在等待队列中,节点线程等待唤醒
    static final int CONDITION = -2;
    // 为-3,当前线程处在SHARED情况下,该字段才会使用
    static final int PROPAGATE = -3;
    // 当前节点在队列中的状态
    volatile int waitStatus;
    // 前驱节点
    volatile Node prev;
    // 后续节点
    volatile Node next;
    // 当前节点的线程
    volatile Thread thread;
    // 指向下一个处于CONDITION状态的节点
    Node nextWaiter;
    ...
}
  1. AQS中CLH变体的虚拟双向队列(FIFO),AQS是通过将每条请求共享资源的线程封装成一个节点来实现锁的分配。
// 队列头节点
private transient volatile Node head;
// 队列尾节点
private transient volatile Node tail;

  在AQS中的队列是一个FIFO队列,它的head节点永远是一个虚拟结点(dummy node), 它不代表任何线程,因此head所指向的Node的thread属性永远是null。但是我们不会在构建过程中创建它们,因为如果没有争用,这将是浪费时间。 而是构造节点,并在第一次争用时设置头和尾指针。只有从次头节点往后的所有节点才代表了所有等待锁的线程。也就是说,在当前线程没有抢到锁被包装成Node扔到队列中时,即使队列是空的,它也会排在第二个,我们会在它的前面新建一个虚拟节点。

4. 获取锁实现

4.1 ReentrantLock 独占锁内部结构

构造函数源代码

// 默认创建非公平锁
public ReentrantLock() {
    sync = new NonfairSync();
}
// 通过传值为true来进行创建公平锁
public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

ReentrantLock 里面有三个内部类:

  1. 一个是抽象的 Sync 实现了 AbstractQueuedSynchronizer
  2. NonfairSync 继承了 Sync
  3. FairSync 继承了 Sync

4.2 非公平锁的实现

ReentrantLock 种获取锁的方法

public void lock() {
    sync.lock();
}

ReentrantLock 的非公平锁实现

static final class NonfairSync extends Sync {
    final void lock() {
    // 如果设置state的值从0变为1成功
        if (compareAndSetState(0, 1))
        // 则将当前线程设置为独占线程
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }
    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
}

compareAndSetState(0,1)

protected final boolean compareAndSetState(int expect, int update) {
    // 通过unsafe.compareAndSwapInt方法来进行设置值
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

stateOffset 为AQS种维护的state属性的偏移量

setExclusiveOwnerThread(Thread.currentThread());

protected final void setExclusiveOwnerThread(Thread thread) {
    exclusiveOwnerThread = thread;
}

acquire(1); 调用的是AQS 中的acquire(int arg) 方法

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

tryAcquire(arg) 该方法是protected的由子类去具体实现的

  我们需要看的是NonfairSync中实现的tryAcquire方法,里面又调用了nonfairTryAcquire方法,再进去看看

static final class NonfairSync extends Sync {
    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
}

nonfairTryAcquire(int acquires) 方法实现

final boolean nonfairTryAcquire(int acquires) {
    // 获取当前线程
    final Thread current = Thread.currentThread();
    // 获取当前state的值
    int c = getState(); 
    if (c == 0) {
      // 看看设置值是否能成功
            if (compareAndSetState(0, acquires)) {
           // 则将当前线程设置为独占线程
            setExclusiveOwnerThread(current);
            return true;
        }
    }
 // 返回由setExclusiveOwnerThread设置的最后一个线程;如果从不设置,则返回null 
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        // 设置state的值
        setState(nextc);
        return true;
    }
    return false;
}

acquireQueued(addWaiter(Node.EXCLUSIVE), arg) 方法实现,先看里addWaiter(Node.EXCLUSIVE)方法注意:Node.EXCLUSIVE 此时是空值,所以mode 就是空的,所以此时创建的Node节点中的nextWaiter是空值。

static final class Node {
  Node(Thread thread, Node mode) {     // Used by addWaiter
    this.nextWaiter = mode;
    this.thread = thread;
  }
}
private Node addWaiter(Node mode) {
    // 创建一个新的节点
    Node node = new Node(Thread.currentThread(), mode);
    // 将当前CLH队列的尾部节点赋予给 pred
    Node pred = tail;
    if (pred != null) { // 如果尾节点不为空
        node.prev = pred; // 将当前node节点的前驱节点指向CLH队列的尾部节点
        if (compareAndSetTail(pred, node)) { // CAS设置值
            pred.next = node; // CLH队列的尾部节点的后继节点指向新的node节点
            return node;
        }
    }
    enq(node);
    return node;
}

如果CLH队列的尾部节点为空值的话,执行enq(node)方法

// 通过CAS方式设置队列的头节点
private final boolean compareAndSetHead(Node update) {
    return unsafe.compareAndSwapObject(this, headOffset, null, update);
}
// 通过CAS方式设置队列的尾部节点
private final boolean compareAndSetTail(Node expect, Node update) {
    return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
// 节点入队操作
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        // 如果尾部节点为空值
        if (t == null) {
        // 则进行初始化一个节点,将其设置为头节点
            if (compareAndSetHead(new Node()))
                tail = head; //头尾指向同一个空节点
        } else {
      // 如果尾部节点不为空,那么将传进来的入队节点的前驱节点指向当前队列的尾部节点
            node.prev = t;
      // 将当前传进来的入队节点设置为当前队列的尾部节点
            if (compareAndSetTail(t, node)) { 
          // 将当前队列的尾部节点的后继节点指向传进来的新节点
                 t.next = node;
                return t;
            }
        }
    }
}

查看 acquireQueued 方法实现

// 获取当前节点的前驱节点
final Node predecessor() throws NullPointerException {
    Node p = prev;
    if (p == null)
        throw new NullPointerException();
    else
        return p;
}
// 检查并更新无法获取的节点的状态。 如果线程应阻塞,则返回true
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//SIGNAL这个状态就有点意思了,它不是表征当前节点的状态,而是当前节点的下一个节点 //的状态。当一个节点的waitStatus被置为SIGNAL,就说明它的下一个节点(即它的后继 // 节点)已经被挂起了(或者马上就要被挂起了),因此在当前节点释放了锁或者放弃获取 // 锁时,如果它的waitStatus属性为SIGNAL,它还要完成一个额外的操作——唤醒它的后继节点。
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        return true;
    if (ws > 0) {
     // 当前节点的 ws > 0, 则为 Node.CANCELLED 说明前驱节点                                        // 已经取消了等待锁(由于超时或者中断等原因)
        // 既然前驱节点不等了, 那就继续往前找, 直到找到一个还在等待锁的节点
        // 然后我们跨过这些不等待锁的节点, 直接排在等待锁的节点的后面 
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
     // 前驱节点的状态既不是SIGNAL,也不是CANCELLED
        // 用CAS设置前驱节点的ws为 Node.SIGNAL,给自己定一个闹钟
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}
// 停止的便捷方法,然后检查是否中断
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}
// 取消正在进行的获取尝试
private void cancelAcquire(Node node) {
    if (node == null)
        return;
    node.thread = null;
    Node pred = node.prev;
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;
    Node predNext = pred.next;
    node.waitStatus = Node.CANCELLED;
    if (node == tail && compareAndSetTail(node, pred)) {
        compareAndSetNext(pred, predNext, null);
    } else {
        int ws;
        if (pred != head &&
            ((ws = pred.waitStatus) == Node.SIGNAL ||
             (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
            pred.thread != null) {
            Node next = node.next;
            if (next != null && next.waitStatus <= 0)
                compareAndSetNext(pred, predNext, next);
        } else {
            unparkSuccessor(node);
        }
        node.next = node;
    }
}
// 能执行到该方法, 说明addWaiter 方法已经成功将包装了当前Thread的节点添加到了等待队列的队尾
// 该方法中将再次尝试去获取锁
// 在再次尝试获取锁失败后, 判断是否需要把当前线程挂起
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
         // 获取当前节点的前驱节点
            final Node p = node.predecessor();
         // 在前驱节点就是head节点的时候,继续尝试获取锁
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // 将当前线程挂起,使CPU不再调度它
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed) 
            cancelAcquire(node);
    }
}

  为什么前面获取锁失败了, 这里还要再次尝试获取锁呢?
首先, 这里再次尝试获取锁是基于一定的条件的,即:当前节点的前驱节点就是HEAD节点,因为我们知道,head节点就是个虚拟节点,它不代表任何线程,或者代表了持有锁的线程,如果当前节点的前驱节点就是head节点,那就说明当前节点已经是排在整个等待队列最前面的了。

setHead(node); 方法

// 这个方法将head指向传进来的node,并且将node的thread和prev属性置为null
private void setHead(Node node) {
    head = node;
    node.thread = null;
    node.prev = null;
}

  可以看出,这个方法的本质是丢弃原来的head,将head指向已经获得了锁的node。但是接着又将该node的thread属性置为null了,这某种意义上导致了这个新的head节点又成为了一个虚拟节点,它不代表任何线程。为什么要这样做呢,因为在tryAcquire调用成功后,exclusiveOwnerThread属性就已经记录了当前获取锁的线程了,此处没有必要再记录。这某种程度上就是将当前线程从等待队列里面拿出来了,是一个变相的出队操作。
shouldParkAfterFailedAcquire(Node pred, Node node)方法

  1. 如果为前驱节点的waitStatus值为 Node.SIGNAL 则直接返回 true
  2. 如果为前驱节点的waitStatus值为 Node.CANCELLED (ws > 0), 则跳过那些节点, 重新寻找正常等待中的前驱节点,然后排在它后面,返回false
  3. 其他情况, 将前驱节点的状态改为 Node.SIGNAL, 返回false

acquireQueued方法中的Finally代码

private void cancelAcquire(Node node) {
  // 将无效节点过滤
    if (node == null)
        return;
  // 设置该节点不关联任何线程,也就是虚节点
    node.thread = null;
    Node pred = node.prev;
  // 通过前驱节点,跳过取消状态的node
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;
  // 获取过滤后的前驱节点的后继节点
    Node predNext = pred.next;
  // 把当前node的状态设置为CANCELLED
    node.waitStatus = Node.CANCELLED;
  // 如果当前节点是尾节点,将从后往前的第一个非取消状态的节点设置为尾节点
  // 更新失败的话,则进入else,如果更新成功,将tail的后继节点设置为null
    if (node == tail && compareAndSetTail(node, pred)) {
        compareAndSetNext(pred, predNext, null);
    } else {
        int ws;
    // 如果当前节点不是head的后继节点,
    // 1.判断当前节点前驱节点的是否为SIGNAL
    // 2.如果不是,则把前驱节点设置为SINGAL看是否成功
    // 如果1和2中有一个为true,再判断当前节点的线程是否为null
    // 如果上述条件都满足,把当前节点的前驱节点的后继指针指向当前节点的后继节点
        if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) {
            Node next = node.next;
            if (next != null && next.waitStatus <= 0)
                compareAndSetNext(pred, predNext, next);
        } else {
   // 如果当前节点是head的后继节点,或者上述条件不满足,那就唤醒当前节点的后继节点
            unparkSuccessor(node);
        }
        node.next = node; // help GC
    }
}

4.3 非公平锁获取流程图

非公平锁获取锁成功的流程图

非公平锁获取锁失败的流程图

5.释放锁实现

5.1释放锁代码分析

  尝试释放此锁。如果当前线程是此锁的持有者,则保留计数将减少。 如果保持计数现在为零,则释放锁定。 如果当前线程不是此锁的持有者,则抛出IllegalMonitorStateException。

## ReentrantLock
public void unlock() {
    sync.release(1);
}

sync.release(1) 调用的是AbstractQueuedSynchronizer中的release方法

## AbstractQueuedSynchronizer
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

分析tryRelease(arg)方法

tryRelease(arg)该方法调用的是ReentrantLock中

protected final boolean tryRelease(int releases) {
// 获取当前锁持有的线程数量和需要释放的值进行相减
    int c = getState() - releases; 
    // 如果当前线程不是锁占有的线程抛出异常
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    // 如果此时c = 0就意味着state = 0,当前锁没有被任意线程占有
    // 将当前所的占有线程设置为空
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    // 设置state的值为 0
    setState(c);
    return free;
}

  如果头节点不为空,并且waitStatus != 0,唤醒后续节点如果存在的话。
这里的判断条件为什么是h != null && h.waitStatus != 0?

  因为h == null的话,Head还没初始化。初始情况下,head == null,第一个节点入队,Head会被初始化一个虚拟节点。所以说,这里如果还没来得及入队,就会出现head == null 的情况。

  1. h != null && waitStatus == 0 表明后继节点对应的线程仍在运行中,不需要唤醒
  2. h != null && waitStatus < 0 表明后继节点可能被阻塞了,需要唤醒
private void unparkSuccessor(Node node) {
// 获取头结点waitStatus
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
// 获取当前节点的下一个节点
    Node s = node.next;
//如果下个节点是null或者下个节点被cancelled,就找到队列最开始的非cancelled的节点
    if (s == null || s.waitStatus > 0) {
        s = null;
        // 就从尾部节点开始找往前遍历,找到队列中第一个waitStatus<0的节点。
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
  // 如果当前节点的下个节点不为空,而且状态<=0,就把当前节点唤醒
    if (s != null)
        LockSupport.unpark(s.thread);
}

  为什么要从后往前找第一个非Cancelled的节点呢?
看一下addWaiter方法

private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}

  我们从这里可以看到,节点入队并不是原子操作,也就是说,node.prev = pred, compareAndSetTail(pred, node) 这两个地方可以看作Tail入队的原子操作,但是此时pred.next = node;还没执行,如果这个时候执行了unparkSuccessor方法,就没办法从前往后找了,所以需要从后往前找。还有一点原因,在产生CANCELLED状态节点的时候,先断开的是Next指针,Prev指针并未断开,因此也是必须要从后往前遍历才能够遍历完全部的Node
所以,如果是从前往后找,由于极端情况下入队的非原子操作和CANCELLED节点产生过程中断开Next指针的操作,可能会导致无法遍历所有的节点。所以,唤醒对应的线程后,对应的线程就会继续往下执行。

5.2 释放锁流程图

6.注意

  由于篇幅较长公平锁的实现在下一篇的博客中讲述,谢谢大家的关注和支持!有问题希望大家指出,共同进步!!!


开心洋葱 , 版权所有丨如未注明 , 均为原创丨未经授权请勿修改 , 转载请注明一文带你学会AQS和并发工具类的关系
喜欢 (0)

您必须 登录 才能发表评论!

加载中……