Skip to content

一、CountDownLatch 核心概念

CountDownLatch(倒计时门闩)是java.util.concurrent包中的同步工具,用于让一个或多个线程等待其他线程完成指定操作。其核心逻辑是:

  • 初始化计数器:构造时指定一个非负整数count,代表需要等待的“事件数”。

  • 等待事件完成:线程调用await()方法会阻塞,直到计数器减至0

  • 递减计数器:每个事件完成后,调用countDown()方法将计数器减1

  • 一次性:计数器一旦减至0,无法重置(若需循环使用,可改用CyclicBarrier)。

    二、基本使用示例

以下是一个典型场景:主线程等待5个子线程完成任务

java
import java.util.concurrent.CountDownLatch;

public class CountDownLatchDemo {
    public static void main(String[] args) throws InterruptedException {
        int threadCount = 5;
        CountDownLatch latch = new CountDownLatch(threadCount); // 初始化计数器为5

        // 启动5个子线程
        for (int i = 0; i < threadCount; i++) {
            new Thread(() -> {
                System.out.println(Thread.currentThread().getName() + " 开始执行任务");
                try {
                    Thread.sleep(1000); // 模拟任务耗时
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + " 完成任务");
                latch.countDown(); // 任务完成,计数器减1
            }).start();
        }

        System.out.println("主线程等待所有子线程完成...");
        latch.await(); // 阻塞主线程,直到计数器为0
        System.out.println("所有子线程完成,主线程继续执行");
    }
}

运行结果

主线程等待所有子线程完成...
Thread-0 开始执行任务
Thread-1 开始执行任务
Thread-2 开始执行任务
Thread-3 开始执行任务
Thread-4 开始执行任务
Thread-0 完成任务
Thread-1 完成任务
Thread-2 完成任务
Thread-3 完成任务
Thread-4 完成任务
所有子线程完成,主线程继续执行

三、源码深入剖析

CountDownLatch的实现完全依赖AbstractQueuedSynchronizer(AQS)——Java并发包的基础框架,通过状态变量state)和等待队列(双向链表)实现同步。

1. 类结构

CountDownLatch内部定义了一个静态内部类Sync,继承自AQSSync重写了AQStryAcquireShared(共享模式获取同步状态)和tryReleaseShared(共享模式释放同步状态)方法,这两个方法是CountDownLatch的核心逻辑。

java
public class CountDownLatch {
    private final Sync sync; // 核心同步器

    // 构造方法:初始化计数器(必须非负)
    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

    // 等待计数器减至0(可中断)
    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    // 等待计数器减至0(带超时)
    public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }

    // 计数器减1
    public void countDown() {
        sync.releaseShared(1);
    }

    // 获取当前计数器值
    public long getCount() {
        return sync.getCount();
    }

    // 内部同步器(继承AQS)
    private static final class Sync extends AbstractQueuedSynchronizer {
        Sync(int count) {
            setState(count); // 将计数器值赋值给AQS的state变量(volatile修饰,保证可见性)
        }

        long getCount() {
            return getState();
        }

        // 共享模式获取同步状态:判断计数器是否为0(是则允许通过,否则阻塞)
        @Override
        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1; // 1表示成功,-1表示失败
        }

        // 共享模式释放同步状态:计数器减1(CAS保证原子性)
        @Override
        protected boolean tryReleaseShared(int releases) {
            for (;;) { // 循环重试,直到CAS成功
                int c = getState();
                if (c == 0) return false; // 计数器已为0,无需操作
                int nextc = c - 1;
                if (compareAndSetState(c, nextc)) { // CAS修改state
                    return nextc == 0; // 若减至0,返回true(需要唤醒等待线程)
                }
            }
        }
    }
}

2. 核心方法解析

CountDownLatch的关键操作(await()countDown())均通过Sync调用AQS的方法实现,以下是对核心流程的拆解:

(1)await():等待计数器减至0

await()方法最终调用AQSacquireSharedInterruptibly(1),该方法是共享模式下可中断的同步状态获取。其流程如下:

mermaid
graph TD
    A[调用await()] --> B[sync.acquireSharedInterruptibly(1)]
    B --> C[检查线程是否中断?]
    C -->|是| D[抛出InterruptedException]
    C -->|否| E[调用tryAcquireShared(1)]
    E -->|返回1(state=0)| F[方法结束,线程继续执行]
    E -->|返回-1(state≠0)| G[进入等待队列,阻塞线程]

细节说明

  • tryAcquireShared(1)Sync重写的方法,逻辑简单:若state(计数器)为0,返回1(成功,允许线程通过);否则返回-1(失败,需阻塞)。

  • 进入等待队列:若tryAcquireShared返回-1AQS会将当前线程封装为Node(节点),加入双向等待队列CLH队列),并调用LockSupport.park()阻塞线程。

    (2)countDown():计数器减1

countDown()方法最终调用AQSreleaseShared(1),该方法是共享模式下同步状态释放。其流程如下:

mermaid
graph TD
    A[调用countDown()] --> B[sync.releaseShared(1)]
    B --> C[调用tryReleaseShared(1)]
    C -->|循环CAS减state| D[state是否减至0?]
    D -->|是| E[唤醒等待队列中的所有线程]
    D -->|否| F[方法结束,无操作]

细节说明

  • tryReleaseShared(1)Sync重写的方法,核心逻辑是循环CAS递减state(避免并发修改冲突)。若state减至0,返回true(需唤醒等待线程);否则返回false

  • 唤醒等待线程:当tryReleaseShared返回true时,AQS会调用doReleaseShared()方法,遍历等待队列,唤醒所有阻塞的线程(共享模式下,多个线程可同时被唤醒)。

    (3)AQS等待队列的唤醒逻辑

doReleaseShared()AQS中共享模式下的唤醒方法,其核心是传播唤醒信号(确保所有等待线程都被唤醒)。流程如下:

  1. 获取头节点:头节点是等待队列中的第一个线程。
  2. 检查头节点状态:若头节点状态为SIGNAL(表示需唤醒后继节点),则用CAS将其状态改为0,并调用LockSupport.unpark()唤醒后继节点。
  3. 传播唤醒信号:若头节点状态为0,则将其状态改为PROPAGATE(表示唤醒信号需传播),确保后续节点也能被唤醒。
  4. 循环检查:直到头节点无变化(避免并发修改导致的问题)。

代码片段(AQS的doReleaseShared())

java
private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) { // 头节点需唤醒后继
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue; // CAS失败,重试
                unparkSuccessor(h); // 唤醒后继节点
            } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue; // 将状态改为PROPAGATE,传播唤醒信号
        }
        if (h == head) // 头节点未变化,退出循环
            break;
    }
}

3. 关键细节总结

  • state变量AQSstate变量(volatile修饰)存储CountDownLatch的计数器值,保证多线程间的可见性。

  • 共享模式CountDownLatch使用AQS共享模式Node.SHARED),因为多个线程可同时等待,且计数器为0时需唤醒所有等待线程(独占模式仅唤醒一个线程)。

  • 可中断性await()方法是可中断的(通过acquireSharedInterruptibly实现),若线程在等待时被中断,会抛出InterruptedException并清除中断状态。

  • 一次性tryReleaseShared方法中,若state已为0,则直接返回false,不再修改state(计数器无法重置)。

    四、CountDownLatch 与其他同步工具的区别

为了更清晰地理解CountDownLatch,我们将其与CyclicBarrierSemaphore进行对比:

特性CountDownLatchCyclicBarrierSemaphore
计数器重置不可重置(一次性)可循环重置(多次使用)可重置(release()增加计数)
等待逻辑等待其他线程完成(一对多)等待所有参与线程到达 barrier(多对多)控制并发线程数(资源限制)
回调函数有(barrierAction,所有线程到达后执行)
使用场景主线程等待子线程完成初始化多线程协同完成同一任务(如分阶段计算)控制并发访问(如数据库连接池)

五、高级使用场景

CountDownLatch的应用场景非常广泛,以下是几个典型案例:

1. 资源初始化(服务器启动)

服务器启动时,需要等待数据库连接、缓存、消息队列等资源初始化完成,才能接受请求。此时可使用CountDownLatch

  • 初始化计数器为资源数量(如3)。

  • 每个资源初始化完成后调用countDown()

  • 主线程调用await()等待所有资源初始化完成。

    2. 并行计算(任务拆分)

将一个大任务拆分为多个子任务,并行计算后汇总结果:

  • 初始化计数器为子任务数量(如4)。

  • 每个子线程处理一个子任务,完成后将结果存入数组,并调用countDown()

  • 主线程等待所有子任务完成后,汇总数组中的结果。

    3. 测试场景(多线程结果验证)

测试多线程程序时,需要等待所有线程完成后检查结果:

  • 初始化计数器为线程数量。

  • 每个测试线程执行测试逻辑,完成后调用countDown()

  • 测试主线程等待所有线程完成后,断言结果是否符合预期。

    六、注意事项

  • 计数器非负:构造CountDownLatch时,count必须非负(否则抛出IllegalArgumentException)。

  • 避免死锁:确保所有调用countDown()的线程都能正常执行(如避免线程被中断或抛出未捕获异常),否则计数器无法减至0,等待线程将永远阻塞。

  • 超时机制:使用await(long timeout, TimeUnit unit)方法可避免无限等待(超时后返回false,线程继续执行)。

    七、总结

CountDownLatch基于AQS实现的共享模式同步工具,其核心逻辑是通过计数器等待队列实现线程间的等待-通知机制。关键源码细节包括:

  • Sync内部类重写tryAcquireShared(判断计数器是否为0)和tryReleaseShared(CAS递减计数器)。
  • await()通过AQSacquireSharedInterruptibly进入等待队列,阻塞线程。
  • countDown()通过AQSreleaseShared递减计数器,当计数器为0时唤醒所有等待线程。

掌握CountDownLatch的源码实现,不仅能更灵活地使用它,还能深入理解AQS的同步机制,为学习其他并发工具(如ReentrantLockSemaphore)打下基础。