Appearance
一、CountDownLatch 核心概念
CountDownLatch
(倒计时门闩)是java.util.concurrent
包中的同步工具,用于让一个或多个线程等待其他线程完成指定操作。其核心逻辑是:
初始化计数器:构造时指定一个非负整数
count
,代表需要等待的“事件数”。等待事件完成:线程调用
await()
方法会阻塞,直到计数器减至0
。递减计数器:每个事件完成后,调用
countDown()
方法将计数器减1
。一次性:计数器一旦减至
0
,无法重置(若需循环使用,可改用CyclicBarrier
)。二、基本使用示例
以下是一个典型场景:主线程等待5个子线程完成任务。
java
import java.util.concurrent.CountDownLatch;
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
int threadCount = 5;
CountDownLatch latch = new CountDownLatch(threadCount); // 初始化计数器为5
// 启动5个子线程
for (int i = 0; i < threadCount; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + " 开始执行任务");
try {
Thread.sleep(1000); // 模拟任务耗时
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " 完成任务");
latch.countDown(); // 任务完成,计数器减1
}).start();
}
System.out.println("主线程等待所有子线程完成...");
latch.await(); // 阻塞主线程,直到计数器为0
System.out.println("所有子线程完成,主线程继续执行");
}
}
运行结果:
主线程等待所有子线程完成...
Thread-0 开始执行任务
Thread-1 开始执行任务
Thread-2 开始执行任务
Thread-3 开始执行任务
Thread-4 开始执行任务
Thread-0 完成任务
Thread-1 完成任务
Thread-2 完成任务
Thread-3 完成任务
Thread-4 完成任务
所有子线程完成,主线程继续执行
三、源码深入剖析
CountDownLatch
的实现完全依赖AbstractQueuedSynchronizer
(AQS)——Java并发包的基础框架,通过状态变量(state
)和等待队列(双向链表)实现同步。
1. 类结构
CountDownLatch
内部定义了一个静态内部类Sync
,继承自AQS
。Sync
重写了AQS
的tryAcquireShared
(共享模式获取同步状态)和tryReleaseShared
(共享模式释放同步状态)方法,这两个方法是CountDownLatch
的核心逻辑。
java
public class CountDownLatch {
private final Sync sync; // 核心同步器
// 构造方法:初始化计数器(必须非负)
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
// 等待计数器减至0(可中断)
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
// 等待计数器减至0(带超时)
public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
// 计数器减1
public void countDown() {
sync.releaseShared(1);
}
// 获取当前计数器值
public long getCount() {
return sync.getCount();
}
// 内部同步器(继承AQS)
private static final class Sync extends AbstractQueuedSynchronizer {
Sync(int count) {
setState(count); // 将计数器值赋值给AQS的state变量(volatile修饰,保证可见性)
}
long getCount() {
return getState();
}
// 共享模式获取同步状态:判断计数器是否为0(是则允许通过,否则阻塞)
@Override
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1; // 1表示成功,-1表示失败
}
// 共享模式释放同步状态:计数器减1(CAS保证原子性)
@Override
protected boolean tryReleaseShared(int releases) {
for (;;) { // 循环重试,直到CAS成功
int c = getState();
if (c == 0) return false; // 计数器已为0,无需操作
int nextc = c - 1;
if (compareAndSetState(c, nextc)) { // CAS修改state
return nextc == 0; // 若减至0,返回true(需要唤醒等待线程)
}
}
}
}
}
2. 核心方法解析
CountDownLatch
的关键操作(await()
、countDown()
)均通过Sync
调用AQS
的方法实现,以下是对核心流程的拆解:
(1)await():等待计数器减至0
await()
方法最终调用AQS
的acquireSharedInterruptibly(1)
,该方法是共享模式下可中断的同步状态获取。其流程如下:
mermaid
graph TD
A[调用await()] --> B[sync.acquireSharedInterruptibly(1)]
B --> C[检查线程是否中断?]
C -->|是| D[抛出InterruptedException]
C -->|否| E[调用tryAcquireShared(1)]
E -->|返回1(state=0)| F[方法结束,线程继续执行]
E -->|返回-1(state≠0)| G[进入等待队列,阻塞线程]
细节说明:
tryAcquireShared(1)
:Sync
重写的方法,逻辑简单:若state
(计数器)为0
,返回1
(成功,允许线程通过);否则返回-1
(失败,需阻塞)。进入等待队列:若
tryAcquireShared
返回-1
,AQS
会将当前线程封装为Node
(节点),加入双向等待队列(CLH
队列),并调用LockSupport.park()
阻塞线程。(2)countDown():计数器减1
countDown()
方法最终调用AQS
的releaseShared(1)
,该方法是共享模式下同步状态释放。其流程如下:
mermaid
graph TD
A[调用countDown()] --> B[sync.releaseShared(1)]
B --> C[调用tryReleaseShared(1)]
C -->|循环CAS减state| D[state是否减至0?]
D -->|是| E[唤醒等待队列中的所有线程]
D -->|否| F[方法结束,无操作]
细节说明:
tryReleaseShared(1)
:Sync
重写的方法,核心逻辑是循环CAS递减state
(避免并发修改冲突)。若state
减至0
,返回true
(需唤醒等待线程);否则返回false
。唤醒等待线程:当
tryReleaseShared
返回true
时,AQS
会调用doReleaseShared()
方法,遍历等待队列,唤醒所有阻塞的线程(共享模式下,多个线程可同时被唤醒)。(3)AQS等待队列的唤醒逻辑
doReleaseShared()
是AQS
中共享模式下的唤醒方法,其核心是传播唤醒信号(确保所有等待线程都被唤醒)。流程如下:
- 获取头节点:头节点是等待队列中的第一个线程。
- 检查头节点状态:若头节点状态为
SIGNAL
(表示需唤醒后继节点),则用CAS将其状态改为0
,并调用LockSupport.unpark()
唤醒后继节点。 - 传播唤醒信号:若头节点状态为
0
,则将其状态改为PROPAGATE
(表示唤醒信号需传播),确保后续节点也能被唤醒。 - 循环检查:直到头节点无变化(避免并发修改导致的问题)。
代码片段(AQS的doReleaseShared()):
java
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) { // 头节点需唤醒后继
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // CAS失败,重试
unparkSuccessor(h); // 唤醒后继节点
} else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // 将状态改为PROPAGATE,传播唤醒信号
}
if (h == head) // 头节点未变化,退出循环
break;
}
}
3. 关键细节总结
state
变量:AQS
的state
变量(volatile
修饰)存储CountDownLatch
的计数器值,保证多线程间的可见性。共享模式:
CountDownLatch
使用AQS
的共享模式(Node.SHARED
),因为多个线程可同时等待,且计数器为0
时需唤醒所有等待线程(独占模式仅唤醒一个线程)。可中断性:
await()
方法是可中断的(通过acquireSharedInterruptibly
实现),若线程在等待时被中断,会抛出InterruptedException
并清除中断状态。一次性:
tryReleaseShared
方法中,若state
已为0
,则直接返回false
,不再修改state
(计数器无法重置)。四、CountDownLatch 与其他同步工具的区别
为了更清晰地理解CountDownLatch
,我们将其与CyclicBarrier
、Semaphore
进行对比:
特性 | CountDownLatch | CyclicBarrier | Semaphore |
---|---|---|---|
计数器重置 | 不可重置(一次性) | 可循环重置(多次使用) | 可重置(release() 增加计数) |
等待逻辑 | 等待其他线程完成(一对多) | 等待所有参与线程到达 barrier(多对多) | 控制并发线程数(资源限制) |
回调函数 | 无 | 有(barrierAction ,所有线程到达后执行) | 无 |
使用场景 | 主线程等待子线程完成初始化 | 多线程协同完成同一任务(如分阶段计算) | 控制并发访问(如数据库连接池) |
五、高级使用场景
CountDownLatch
的应用场景非常广泛,以下是几个典型案例:
1. 资源初始化(服务器启动)
服务器启动时,需要等待数据库连接、缓存、消息队列等资源初始化完成,才能接受请求。此时可使用CountDownLatch
:
初始化计数器为资源数量(如
3
)。每个资源初始化完成后调用
countDown()
。主线程调用
await()
等待所有资源初始化完成。2. 并行计算(任务拆分)
将一个大任务拆分为多个子任务,并行计算后汇总结果:
初始化计数器为子任务数量(如
4
)。每个子线程处理一个子任务,完成后将结果存入数组,并调用
countDown()
。主线程等待所有子任务完成后,汇总数组中的结果。
3. 测试场景(多线程结果验证)
测试多线程程序时,需要等待所有线程完成后检查结果:
初始化计数器为线程数量。
每个测试线程执行测试逻辑,完成后调用
countDown()
。测试主线程等待所有线程完成后,断言结果是否符合预期。
六、注意事项
计数器非负:构造
CountDownLatch
时,count
必须非负(否则抛出IllegalArgumentException
)。避免死锁:确保所有调用
countDown()
的线程都能正常执行(如避免线程被中断或抛出未捕获异常),否则计数器无法减至0
,等待线程将永远阻塞。超时机制:使用
await(long timeout, TimeUnit unit)
方法可避免无限等待(超时后返回false
,线程继续执行)。七、总结
CountDownLatch
是基于AQS实现的共享模式同步工具,其核心逻辑是通过计数器和等待队列实现线程间的等待-通知机制。关键源码细节包括:
Sync
内部类重写tryAcquireShared
(判断计数器是否为0
)和tryReleaseShared
(CAS递减计数器)。await()
通过AQS
的acquireSharedInterruptibly
进入等待队列,阻塞线程。countDown()
通过AQS
的releaseShared
递减计数器,当计数器为0
时唤醒所有等待线程。
掌握CountDownLatch
的源码实现,不仅能更灵活地使用它,还能深入理解AQS
的同步机制,为学习其他并发工具(如ReentrantLock
、Semaphore
)打下基础。