AQS技术全面学习只看这篇
前言
在 Java 并发编程中,AbstractQueuedSynchronizer(简称 AQS)是 java.util.concurrent 包的基石。几乎所有高级同步工具(如 ReentrantLock、Semaphore、CountDownLatch 等)都基于 AQS 构建。它通过“状态 + 队列”模型,将线程排队、阻塞、唤醒等复杂逻辑封装起来,让开发者只需关注资源获取/释放的规则。
本文将从源码级深入剖析 AQS 的实现机制,并介绍其在 JDK 中的核心应用,最后手写一个基于 AQS 的自定义锁,助你彻底掌握这一并发核心组件。
一、AQS 核心机制与源码实现详解
AQS 的设计哲学是:模板方法 + CAS + CLH 队列。我们逐层拆解其内部结构。
1.1 核心字段
java
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer {
// 同步状态:由子类定义语义(如 0=空闲, >0=占用)
private volatile int state;
// FIFO 双向同步队列的头尾指针
private transient volatile Node head;
private transient volatile Node tail;
// 获取/设置 state 的原子操作
protected final int getState() { return state; }
protected final void setState(int newState) { state = newState; }
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
}
✅
state是所有竞争的焦点,必须通过 CAS 操作保证原子性。
1.2 Node 节点结构
同步队列中的每个元素都是一个 Node,代表一个等待线程:
java
static final class Node {
static final Node SHARED = new Node(); // 共享模式标记
static final Node EXCLUSIVE = null; // 独占模式标记
// 等待状态
static final int CANCELLED = 1; // 已取消
static final int SIGNAL = -1; // 前驱需唤醒后继
static final int CONDITION = -2; // 在 Condition 队列中
static final int PROPAGATE = -3; // 共享模式需传播
volatile int waitStatus;
volatile Node prev; // 前驱
volatile Node next; // 后继
volatile Thread thread; // 关联线程
Node nextWaiter; // 模式标记(SHARED 或 null)
}
🔑 关键区别:
nextWaiter == SHARED表示共享模式,== null表示独占模式。
1.3 独占模式:获取资源流程(acquire)
java
public final void acquire(int arg) {
if (!tryAcquire(arg) && // 1. 尝试获取(子类实现)
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 2. 失败则入队阻塞
selfInterrupt(); // 恢复中断状态
}
步骤分解:
① 入队:addWaiter
java
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) { // 快速路径
pred.next = node;
return node;
}
}
enq(node); // 自旋入队(慢路径)
return node;
}
② 阻塞等待:acquireQueued
java
final boolean acquireQueued(final Node node, int arg) {
for (;;) {
final Node p = node.predecessor();
// 仅当前驱是 head 时才尝试获取
if (p == head && tryAcquire(arg)) {
setHead(node); // 成功,设为新 head
p.next = null; // help GC
return interrupted;
}
// 决定是否阻塞
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
}
③ 是否阻塞:shouldParkAfterFailedAcquire
- 若前驱
waitStatus == SIGNAL→ 安全阻塞 - 若前驱已取消 → 跳过并清理
- 否则 → 将前驱设为
SIGNAL,下次再试
💡 精妙之处:只有前驱承诺唤醒(
SIGNAL),当前线程才会调用LockSupport.park()阻塞,避免无效唤醒。
1.4 释放资源:release
java
public final boolean release(int arg) {
if (tryRelease(arg)) { // 子类实现释放逻辑
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h); // 唤醒后继
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
// 清除 SIGNAL
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
// 从 tail 向前找第一个有效节点(解决 next 未设置问题)
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0) s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
🌟 为什么从 tail 向前遍历?
因为enq中设置prev在 CAS 之前,但next在 CAS 之后,可能导致next为 null。从 tail 向前更安全。
1.5 共享模式 vs 独占模式
| 特性 | 独占模式 | 共享模式 |
|---|---|---|
| API | acquire/release |
acquireShared/releaseShared |
| Node 标记 | EXCLUSIVE (null) |
SHARED |
| 唤醒行为 | 仅唤醒一个后继 | 传播唤醒多个共享节点 |
| 核心方法 | tryAcquire/tryRelease |
tryAcquireShared/tryReleaseShared |
共享模式的关键:setHeadAndPropagate
java
private void setHeadAndPropagate(Node node, int propagate) {
setHead(node);
// 如果还有资源(propagate > 0)或头节点状态为 PROPAGATE
if (propagate > 0 || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared(); // 继续唤醒后继共享节点!
}
}
✅ 这就是
Semaphore能同时唤醒多个线程的原因。
二、JDK 中基于 AQS 的核心类及其实现差异
| 类 | 模式 | state 含义 |
核心用途 |
|---|---|---|---|
ReentrantLock |
独占 | 锁重入次数 | 可重入互斥锁 |
ReentrantReadWriteLock.ReadLock |
共享 | 高16位:读锁计数 | 读多写少场景 |
ReentrantReadWriteLock.WriteLock |
独占 | 低16位:写锁重入次数 | 写操作互斥 |
Semaphore |
共享 | 剩余许可数量 | 控制并发访问数 |
CountDownLatch |
共享 | 倒计数器值 | 等待 N 个任务完成 |
FutureTask |
独占 | 执行状态(NEW/RUNNING/DONE) | 异步计算结果 |
实现差异举例:
ReentrantLock:state=0表示空闲;>0表示被占用,值为重入次数。Semaphore:state初始为许可数,每次acquire减 1,release加 1。CountDownLatch:state初始为 N,每次countDown减 1,归零时唤醒所有等待者。
📌 所有这些类都只重写了 AQS 的
tryXxx方法,排队逻辑完全由 AQS 提供。
三、手写一个基于 AQS 的不可重入互斥锁
下面实现一个简化版的 Mutex 锁,展示如何利用 AQS 构建自定义同步器。
java
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Lock;
public class SimpleMutex implements Lock {
private static class Sync extends AbstractQueuedSynchronizer {
@Override
protected boolean tryAcquire(int ignored) {
// CAS 将 state 从 0 → 1
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
@Override
protected boolean tryRelease(int ignored) {
if (getExclusiveOwnerThread() != Thread.currentThread()) {
throw new IllegalMonitorStateException();
}
setExclusiveOwnerThread(null);
setState(0); // 释放锁
return true;
}
@Override
protected boolean isHeldExclusively() {
return getExclusiveOwnerThread() == Thread.currentThread();
}
}
private final Sync sync = new Sync();
@Override public void lock() { sync.acquire(1); }
@Override public void unlock() { sync.release(1); }
@Override public boolean tryLock() { return sync.tryAcquire(1); }
// ... 其他方法略
}
测试验证互斥性:
java
public class MutexTest {
static int count = 0;
static SimpleMutex lock = new SimpleMutex();
public static void main(String[] args) throws Exception {
Thread t1 = new Thread(() -> {
for (int i = 0; i < 10000; i++) {
lock.lock();
try { count++; } finally { lock.unlock(); }
}
});
Thread t2 = new Thread(() -> {
for (int i = 0; i < 10000; i++) {
lock.lock();
try { count++; } finally { lock.unlock(); }
}
});
t1.start(); t2.start();
t1.join(); t2.join();
System.out.println("Final count: " + count); // 必为 20000
}
}
✅ 该锁虽简单,但已具备完整的 AQS 能力:线程排队、阻塞、唤醒、异常处理。
四、总结
AQS 是 Java 并发包中最精妙的设计之一,其核心价值在于:
- 解耦:将“资源竞争规则”与“线程排队管理”分离。
- 高效:基于 CAS +
LockSupport实现无锁队列和低开销阻塞。 - 灵活:通过独占/共享模式支持各种同步语义。
- 可靠:处理了中断、超时、取消等复杂边界情况。
掌握 AQS,不仅能让你深入理解 ReentrantLock、Semaphore 等工具的原理,更能赋予你构建高性能自定义同步组件的能力。
📚 建议:结合 OpenJDK 源码阅读(如
ReentrantLock.Sync、Semaphore.Sync),体会大师级工程设计。
附:AQS 核心流程图(简化版)
scss
┌──────────────┐
│ tryAcquire() │
└──────┬───────┘
│
┌────────────────┼─────────────────┐
│ ▼ │
│ 成功?(true) │
│ │ │
│ ▼ │
│ 执行临界区 │
│ │ │
│ ▼ │
│ tryRelease() │
│ │ │
│ ▼ │
│ 唤醒 head.next 线程 │
│ │
▼ (false) ▼
┌─────────────┐ ┌──────────────────┐
│ addWaiter() │ │ release(arg) │
└──────┬──────┘ └─────────┬────────┘
│ │
▼ ▼
┌─────────────┐ ┌──────────────────┐
│acquireQueued│◄───────────────┤ unparkSuccessor()│
└──────┬──────┘ (被唤醒重试) └──────────────────┘
│
▼
┌──────────────────────-─┐
│shouldParkAfterFailed...│
└───────────┬──────────-─┘
│
▼
LockSupport.park()
│
▼
(被 unpark 唤醒)
掌握 AQS,你就掌握了 Java 并发的“任督二脉”。

