Appearance
AQS Condition源码分析
AQS的ConditionObject
实现类似Object
的wait/notify/notifyAll
功能,但提供了更灵活的条件等待机制。
多条件队列支持
- 每个
ConditionObject
维护独立的条件等待队列 - 相比传统
synchronized
的单一监视器,支持更细粒度的线程等待控制
使用案例
java
// 典型使用示例
ReentrantLock lock = new ReentrantLock();
Condition notFull = lock.newCondition();
Condition notEmpty = lock.newCondition();
// 生产者线程
lock.lock();
try {
while (queue.isFull()) {
notFull.await();
}
queue.put(item);
notEmpty.signal();
} catch (InterruptedException e) {
// 保留中断状态以便上层代码处理
Thread.currentThread().interrupt();
// 可选:添加异常处理逻辑如日志记录
} finally {
lock.unlock();
}
// 消费者线程
lock.lock();
try {
while (queue.isEmpty()) {
notEmpty.await();
}
// 消费元素
Item item = queue.take();
// 唤醒可能被阻塞的生产者线程
notFull.signal();
return item;
}catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;
}finally {
lock.unlock();
}
await()
:线程进入等待状态,释放锁signal()
:唤醒一个等待线程signalAll()
:唤醒所有等待线程
实现原理
- 条件队列结构:每个条件队列维护
firstWaiter
和lastWaiter
指针 - 节点状态转换:
- 线程调用
await()
时:- 原子释放同步状态
- 当前线程封装为
Node
加入条件队列 - 阻塞线程直至被唤醒或中断
- 调用
signal()
时:- 将条件队列首节点转移至同步队列
- 等待后续的锁获取操作
- 线程调用
关键方法源码解析
await()
:- 执行流程:
- 检查中断状态:通过
checkInterruptWhileWaiting()
检测线程初始化中断状态 - 创建条件节点:java
Node node = new Node(Thread.currentThread(), Node.CONDITION); // 将节点添加到条件队列尾部 if (compareAndSetWaitStatus(Node.CONDITION, 0)) { enq(node); }
- 完全释放同步状态:调用
release()
方法原子释放锁 - 循环检测队列归属:java
while (!isOnSyncQueue(node)) { // 阻塞线程直到被转移到同步队列 LockSupport.park(this); // 处理中断情况 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; }
- 竞争获取锁:通过
acquireQueued()
方法重新获取锁 - 清理取消节点:移除条件队列中被取消的节点
- 检查中断状态:通过
- 变体支持:
awaitUninterruptibly()
:javawhile (!isOnSyncQueue(node)) { if (Thread.interrupted()) throw new InterruptedException(); LockSupport.park(this); }
awaitNanos(long nanosTimeout)
:javafinal long deadline = System.nanoTime() + nanosTimeout; while (true) { if (Thread.interrupted()) throw new InterruptedException(); // 计算剩余等待时间 if ((nanosTimeout = deadline - System.nanoTime()) <= 0L) { // 超时后直接返回false transferAfterCancelledWait(node); return false; } if (nanosTimeout > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); }
- 执行流程:
signal()
:- 执行流程:
- 校验锁持有状态:通过
tryRelease()
检查当前线程是否持有锁 - 获取首节点:获取条件队列的
firstWaiter
- 转移节点至同步队列:java
// 将条件节点转移到同步队列 if (!transferForSignal(firstWaiter)) { // 如果转移失败,继续处理后续节点 doSignal(firstWaiter); }
- 校验锁持有状态:通过
- 节点处理:
- 更新队列指针:通过CAS操作更新
firstWaiter
和lastWaiter
- 断开原连接:将节点从条件队列中移除
- 触发唤醒机制:java
// 唤醒指定节点的线程 unparkSuccessor(node);
- 更新队列指针:通过CAS操作更新
- 执行流程:
signalAll()
:- 执行流程:java
// 将所有等待节点转移到同步队列 while (firstWaiter != null) { transferForSignal(firstWaiter); // 更新头节点指针 if ((next = firstWaiter.nextWaiter) == null) { lastWaiter = next; } firstWaiter = next; }
- 批量转移优化:
- 减少锁竞争:通过单次遍历完成所有节点转移
- 避免重复唤醒:每个节点仅触发一次唤醒操作
- 执行流程:
- 与传统监视器对比优势
特性 Object监视器 ConditionObject 条件队列数量 每个对象1个 每个Lock可有多个 精确唤醒 不支持 支持指定条件队列唤醒 响应中断能力 不可中断 支持中断处理 超时控制 仅支持毫秒级 纳米级精度 与锁的绑定关系 与synchronized强制绑定 可与任意AQS实现类配合