Skip to content

AQS Condition源码分析

AQS的ConditionObject实现类似Objectwait/notify/notifyAll功能,但提供了更灵活的条件等待机制。

多条件队列支持

  • 每个ConditionObject维护独立的条件等待队列
  • 相比传统synchronized的单一监视器,支持更细粒度的线程等待控制

使用案例

java
// 典型使用示例
ReentrantLock lock = new ReentrantLock();
Condition notFull  = lock.newCondition(); 
Condition notEmpty = lock.newCondition();
// 生产者线程
lock.lock();
try {
    while (queue.isFull()) {
        notFull.await();
    }
    queue.put(item);
    notEmpty.signal();
} catch (InterruptedException e) {
    // 保留中断状态以便上层代码处理
    Thread.currentThread().interrupt();
    // 可选:添加异常处理逻辑如日志记录
} finally {
    lock.unlock();
}


// 消费者线程
lock.lock();
try {
while (queue.isEmpty()) {
     notEmpty.await();
 }
 // 消费元素
 Item item = queue.take();
 // 唤醒可能被阻塞的生产者线程
 notFull.signal();
 return item;
}catch (InterruptedException e) {
 Thread.currentThread().interrupt();
return null;
}finally {
 lock.unlock();
}
  • await():线程进入等待状态,释放锁
  • signal():唤醒一个等待线程
  • signalAll():唤醒所有等待线程

实现原理

  • 条件队列结构:每个条件队列维护firstWaiterlastWaiter指针
  • 节点状态转换
    1. 线程调用await()时:
      • 原子释放同步状态
      • 当前线程封装为Node加入条件队列
      • 阻塞线程直至被唤醒或中断
    2. 调用signal()时:
      • 将条件队列首节点转移至同步队列
      • 等待后续的锁获取操作

关键方法源码解析

  • await()
    • 执行流程
      1. 检查中断状态:通过checkInterruptWhileWaiting()检测线程初始化中断状态
      2. 创建条件节点
        java
        Node node = new Node(Thread.currentThread(), Node.CONDITION);
        // 将节点添加到条件队列尾部
        if (compareAndSetWaitStatus(Node.CONDITION, 0)) {
            enq(node);
        }
      3. 完全释放同步状态:调用release()方法原子释放锁
      4. 循环检测队列归属
        java
        while (!isOnSyncQueue(node)) {
            // 阻塞线程直到被转移到同步队列
            LockSupport.park(this);
            // 处理中断情况
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
        }
      5. 竞争获取锁:通过acquireQueued()方法重新获取锁
      6. 清理取消节点:移除条件队列中被取消的节点
    • 变体支持
      • awaitUninterruptibly()
        java
        while (!isOnSyncQueue(node)) {
            if (Thread.interrupted())
                throw new InterruptedException();
            LockSupport.park(this);
        }
      • awaitNanos(long nanosTimeout)
        java
        final long deadline = System.nanoTime() + nanosTimeout;
        while (true) {
            if (Thread.interrupted())
                throw new InterruptedException();
            // 计算剩余等待时间
            if ((nanosTimeout = deadline - System.nanoTime()) <= 0L) {
                // 超时后直接返回false
                transferAfterCancelledWait(node);
                return false;
            }
            if (nanosTimeout > spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanosTimeout);
        }
  • signal()
    • 执行流程
      1. 校验锁持有状态:通过tryRelease()检查当前线程是否持有锁
      2. 获取首节点:获取条件队列的firstWaiter
      3. 转移节点至同步队列
        java
        // 将条件节点转移到同步队列
        if (!transferForSignal(firstWaiter)) {
            // 如果转移失败,继续处理后续节点
            doSignal(firstWaiter);
        }
    • 节点处理
      • 更新队列指针:通过CAS操作更新firstWaiterlastWaiter
      • 断开原连接:将节点从条件队列中移除
      • 触发唤醒机制:
        java
        // 唤醒指定节点的线程
        unparkSuccessor(node);
  • signalAll()
    • 执行流程
      java
      // 将所有等待节点转移到同步队列
      while (firstWaiter != null) {
          transferForSignal(firstWaiter);
          // 更新头节点指针
          if ((next = firstWaiter.nextWaiter) == null) {
              lastWaiter = next;
          }
          firstWaiter = next;
      }
    • 批量转移优化
      • 减少锁竞争:通过单次遍历完成所有节点转移
      • 避免重复唤醒:每个节点仅触发一次唤醒操作
  1. 与传统监视器对比优势
    特性Object监视器ConditionObject
    条件队列数量每个对象1个每个Lock可有多个
    精确唤醒不支持支持指定条件队列唤醒
    响应中断能力不可中断支持中断处理
    超时控制仅支持毫秒级纳米级精度
    与锁的绑定关系与synchronized强制绑定可与任意AQS实现类配合