Skip to content

一、CyclicBarrier 核心概念

CyclicBarrierjava.util.concurrent包中的同步工具,用于让一组线程互相等待,直到所有线程都到达某个屏障点(Barrier Point),之后才能继续执行。其核心特点是:

  1. 循环性(Cyclic):屏障触发后(所有线程到达),会自动重置状态,可重复使用(区别于CountDownLatch的一次性)。
  2. 互相等待:所有参与线程(parties)必须全部到达,否则均阻塞。
  3. 屏障动作(Barrier Action):可选,当所有线程到达后,由最后一个到达的线程执行一个指定的Runnable任务(如合并结果、打印日志)。

二、基本使用示例

假设我们有一个多线程数据计算场景:3个线程分别计算数据片段,完成后等待所有线程结束,再合并结果。

java
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CyclicBarrierDemo {
    public static void main(String[] args) {
        // 1. 初始化CyclicBarrier:3个参与线程,屏障动作(合并结果)
        CyclicBarrier barrier = new CyclicBarrier(3, () -> 
            System.out.println("=== 所有线程计算完成,开始合并结果 ===")
        );

        ExecutorService executor = Executors.newFixedThreadPool(3);
        // 2. 提交3个计算任务
        for (int i = 1; i <= 3; i++) {
            int threadId = i;
            executor.submit(() -> {
                try {
                    System.out.println("线程" + threadId + ":开始计算数据");
                    Thread.sleep((long) (Math.random() * 1000)); // 模拟计算耗时
                    System.out.println("线程" + threadId + ":计算完成,等待其他线程");
                    // 3. 到达屏障点,阻塞等待
                    int index = barrier.await(); // 返回当前线程的到达顺序(0表示最后一个)
                    System.out.println("线程" + threadId + ":继续执行(到达顺序:" + index + ")");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
        }

        executor.shutdown();
    }
}

运行结果(顺序可能不同,但逻辑一致):

线程1:开始计算数据
线程2:开始计算数据
线程3:开始计算数据
线程2:计算完成,等待其他线程
线程1:计算完成,等待其他线程
线程3:计算完成,等待其他线程
=== 所有线程计算完成,开始合并结果 ===
线程3:继续执行(到达顺序:0) // 最后一个到达,执行屏障动作
线程1:继续执行(到达顺序:1)
线程2:继续执行(到达顺序:2)

三、源码深入剖析(JDK 8)

CyclicBarrier的实现依赖**ReentrantLock(保证同步)Condition(等待/通知机制)**,核心成员变量如下:

1. 核心成员变量

java
public class CyclicBarrier {
    // 1. 同步锁(保证操作原子性)
    private final ReentrantLock lock = new ReentrantLock();
    // 2. 条件变量(用于线程等待)
    private final Condition trip = lock.newCondition();
    // 3. 参与线程数(必须>0)
    private final int parties;
    // 4. 屏障动作(所有线程到达后执行)
    private final Runnable barrierCommand;
    // 5. 当前代(Generation):用于实现循环性,每轮屏障对应一个Generation
    private Generation generation = new Generation();
    // 6. 当前等待线程数(初始=parties,每到一个线程减1)
    private int count;

    // 静态内部类:表示屏障的"代",用于区分不同轮次的屏障
    private static class Generation {
        boolean broken = false; // 屏障是否被破坏(如线程中断、超时)
    }
}
  • Generation(代):是CyclicBarrier循环性的关键。每轮屏障触发后,会生成新的Generation,重置countparties,从而实现重复使用。

  • count:当前等待的线程数,初始等于parties。每调用一次await()count减1;当count=0时,触发屏障动作。

    2. 构造函数

CyclicBarrier有两个构造函数:

  • CyclicBarrier(int parties):仅指定参与线程数。
  • CyclicBarrier(int parties, Runnable barrierCommand):指定参与线程数和屏障动作。
java
public CyclicBarrier(int parties) {
    this(parties, null);
}

public CyclicBarrier(int parties, Runnable barrierCommand) {
    if (parties <= 0) throw new IllegalArgumentException();
    this.parties = parties;
    this.count = parties; // 初始count=parties
    this.barrierCommand = barrierCommand;
}

3. 核心方法:await()

await()CyclicBarrier的核心方法,用于让当前线程到达屏障点并等待。其重载版本包括:

  • await():无限等待,直到所有线程到达或屏障被破坏。
  • await(long timeout, TimeUnit unit):超时等待,超时后抛出TimeoutException

await()的底层实现是dowait()方法,我们重点分析dowait()的逻辑:

java
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException {
    final ReentrantLock lock = this.lock;
    lock.lock(); // 1. 获取锁,保证原子性
    try {
        final Generation g = generation; // 2. 记录当前代(避免跨代问题)

        // 3. 检查当前代是否被破坏(如线程中断、超时)
        if (g.broken) {
            throw new BrokenBarrierException();
        }

        // 4. 检查当前线程是否被中断
        if (Thread.interrupted()) {
            breakBarrier(); // 破坏屏障(设置g.broken=true,唤醒所有线程)
            throw new InterruptedException();
        }

        // 5. 减少等待线程数(count--)
        int index = --count;
        if (index == 0) { // 6. 最后一个线程到达(触发屏障)
            boolean ranAction = false;
            try {
                // 执行屏障动作(由最后一个线程执行)
                final Runnable command = barrierCommand;
                if (command != null) {
                    command.run();
                }
                ranAction = true;
                // 进入下一轮屏障(重置count、生成新代)
                nextGeneration();
                return 0; // 返回0,表示当前线程是最后一个到达的
            } finally {
                // 如果屏障动作抛出异常,破坏屏障(避免其他线程无限等待)
                if (!ranAction) {
                    breakBarrier();
                }
            }
        }

        // 7. 非最后一个线程:循环等待(直到被唤醒、中断、超时或屏障被破坏)
        for (;;) {
            try {
                // 根据是否超时,选择等待方式
                if (!timed) {
                    trip.await(); // 无限等待
                } else if (nanos > 0L) {
                    nanos = trip.awaitNanos(nanos); // 超时等待
                }
            } catch (InterruptedException ie) {
                // 8. 等待过程中被中断
                if (g == generation && !g.broken) {
                    breakBarrier(); // 破坏屏障
                    throw ie; // 抛出中断异常
                } else {
                    // 已经是新的代或屏障已破坏,只需重新中断当前线程
                    Thread.currentThread().interrupt();
                }
            }

            // 9. 唤醒后检查屏障状态
            if (g.broken) {
                throw new BrokenBarrierException(); // 屏障被破坏,抛出异常
            }
            if (g != generation) {
                return index; // 进入新的代(屏障已触发),返回当前线程的到达顺序
            }
            // 10. 超时处理
            if (timed && nanos <= 0L) {
                breakBarrier(); // 破坏屏障
                throw new TimeoutException(); // 抛出超时异常
            }
        }
    } finally {
        lock.unlock(); // 释放锁
    }
}

4. 关键辅助方法

  • nextGeneration():触发屏障后,重置状态以进入下一轮。

    java
    private void nextGeneration() {
        trip.signalAll(); // 唤醒所有等待的线程
        count = parties; // 重置count为初始值
        generation = new Generation(); // 生成新的代(循环性的关键)
    }
  • breakBarrier():破坏屏障(如线程中断、超时),唤醒所有线程并标记当前代为broken

    java
    private void breakBarrier() {
        generation.broken = true; // 标记当前代被破坏
        count = parties; // 重置count(避免后续线程无限等待)
        trip.signalAll(); // 唤醒所有等待的线程
    }
  • reset():手动重置屏障到初始状态(会破坏当前代,唤醒所有等待线程)。

    java
    public void reset() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            breakBarrier(); // 破坏当前代
            nextGeneration(); // 生成新的代
        } finally {
            lock.unlock();
        }
    }

    四、CyclicBarrier 与 CountDownLatch 的区别

特性CyclicBarrierCountDownLatch
循环性可循环使用(nextGeneration()一次性(count减到0后无法重置)
等待逻辑所有线程互相等待(均需到达屏障)一个/多个线程等待其他线程完成
屏障动作有(最后一个线程执行)
count 变化由参与线程调用await()递减由其他线程调用countDown()递减
异常处理破坏屏障(broken标记),所有线程抛出BrokenBarrierException无特殊异常处理(中断会抛出InterruptedException

五、常见问题与注意事项

  1. 屏障动作的异常处理
    屏障动作(barrierCommand)如果抛出异常,会导致breakBarrier()被调用,所有线程抛出BrokenBarrierException。因此,屏障动作中的代码必须捕获异常确保无异常

  2. 线程中断的影响
    任何线程在await()期间被中断,都会触发breakBarrier(),导致所有线程抛出BrokenBarrierException。如果需要处理中断,应在catch (InterruptedException)中手动处理。

  3. 超时的影响
    使用await(timeout, unit)时,若超时,会触发breakBarrier(),所有线程抛出TimeoutExceptionBrokenBarrierException。需合理设置超时时间,避免不必要的屏障破坏。

  4. parties 的取值
    parties必须大于0,否则构造函数抛出IllegalArgumentException

六、使用场景总结

CyclicBarrier适用于需要多个线程同步到达某个点后再继续执行的场景,典型案例包括:

  • 多线程数据合并:如分布式计算中,多个线程计算数据片段,等待所有线程完成后合并结果。

  • 并发测试:如模拟100个用户同时登录,确保所有用户线程同时开始执行登录逻辑。

  • 游戏关卡加载:如多人游戏中,所有玩家加载完当前关卡后,才能进入下一关。

    七、总结

CyclicBarrier是一个循环性互相等待的同步工具,其核心是通过ReentrantLockCondition实现线程的等待/通知,通过Generation实现循环性。深入理解其源码中的代机制屏障动作异常处理,能帮助我们更好地使用它,避免出现同步问题。

CountDownLatch相比,CyclicBarrier更适合需要重复同步的场景,而CountDownLatch更适合一次性等待的场景。在实际开发中,需根据具体需求选择合适的同步工具。