Appearance
一、CyclicBarrier 核心概念
CyclicBarrier
是java.util.concurrent
包中的同步工具,用于让一组线程互相等待,直到所有线程都到达某个屏障点(Barrier Point),之后才能继续执行。其核心特点是:
- 循环性(Cyclic):屏障触发后(所有线程到达),会自动重置状态,可重复使用(区别于
CountDownLatch
的一次性)。 - 互相等待:所有参与线程(
parties
)必须全部到达,否则均阻塞。 - 屏障动作(Barrier Action):可选,当所有线程到达后,由最后一个到达的线程执行一个指定的
Runnable
任务(如合并结果、打印日志)。
二、基本使用示例
假设我们有一个多线程数据计算场景:3个线程分别计算数据片段,完成后等待所有线程结束,再合并结果。
java
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CyclicBarrierDemo {
public static void main(String[] args) {
// 1. 初始化CyclicBarrier:3个参与线程,屏障动作(合并结果)
CyclicBarrier barrier = new CyclicBarrier(3, () ->
System.out.println("=== 所有线程计算完成,开始合并结果 ===")
);
ExecutorService executor = Executors.newFixedThreadPool(3);
// 2. 提交3个计算任务
for (int i = 1; i <= 3; i++) {
int threadId = i;
executor.submit(() -> {
try {
System.out.println("线程" + threadId + ":开始计算数据");
Thread.sleep((long) (Math.random() * 1000)); // 模拟计算耗时
System.out.println("线程" + threadId + ":计算完成,等待其他线程");
// 3. 到达屏障点,阻塞等待
int index = barrier.await(); // 返回当前线程的到达顺序(0表示最后一个)
System.out.println("线程" + threadId + ":继续执行(到达顺序:" + index + ")");
} catch (Exception e) {
e.printStackTrace();
}
});
}
executor.shutdown();
}
}
运行结果(顺序可能不同,但逻辑一致):
线程1:开始计算数据
线程2:开始计算数据
线程3:开始计算数据
线程2:计算完成,等待其他线程
线程1:计算完成,等待其他线程
线程3:计算完成,等待其他线程
=== 所有线程计算完成,开始合并结果 ===
线程3:继续执行(到达顺序:0) // 最后一个到达,执行屏障动作
线程1:继续执行(到达顺序:1)
线程2:继续执行(到达顺序:2)
三、源码深入剖析(JDK 8)
CyclicBarrier
的实现依赖**ReentrantLock
(保证同步)和Condition
(等待/通知机制)**,核心成员变量如下:
1. 核心成员变量
java
public class CyclicBarrier {
// 1. 同步锁(保证操作原子性)
private final ReentrantLock lock = new ReentrantLock();
// 2. 条件变量(用于线程等待)
private final Condition trip = lock.newCondition();
// 3. 参与线程数(必须>0)
private final int parties;
// 4. 屏障动作(所有线程到达后执行)
private final Runnable barrierCommand;
// 5. 当前代(Generation):用于实现循环性,每轮屏障对应一个Generation
private Generation generation = new Generation();
// 6. 当前等待线程数(初始=parties,每到一个线程减1)
private int count;
// 静态内部类:表示屏障的"代",用于区分不同轮次的屏障
private static class Generation {
boolean broken = false; // 屏障是否被破坏(如线程中断、超时)
}
}
Generation
(代):是CyclicBarrier
循环性的关键。每轮屏障触发后,会生成新的Generation
,重置count
为parties
,从而实现重复使用。count
:当前等待的线程数,初始等于parties
。每调用一次await()
,count
减1;当count
=0时,触发屏障动作。2. 构造函数
CyclicBarrier
有两个构造函数:
CyclicBarrier(int parties)
:仅指定参与线程数。CyclicBarrier(int parties, Runnable barrierCommand)
:指定参与线程数和屏障动作。
java
public CyclicBarrier(int parties) {
this(parties, null);
}
public CyclicBarrier(int parties, Runnable barrierCommand) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties; // 初始count=parties
this.barrierCommand = barrierCommand;
}
3. 核心方法:await()
await()
是CyclicBarrier
的核心方法,用于让当前线程到达屏障点并等待。其重载版本包括:
await()
:无限等待,直到所有线程到达或屏障被破坏。await(long timeout, TimeUnit unit)
:超时等待,超时后抛出TimeoutException
。
await()
的底层实现是dowait()
方法,我们重点分析dowait()
的逻辑:
java
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock(); // 1. 获取锁,保证原子性
try {
final Generation g = generation; // 2. 记录当前代(避免跨代问题)
// 3. 检查当前代是否被破坏(如线程中断、超时)
if (g.broken) {
throw new BrokenBarrierException();
}
// 4. 检查当前线程是否被中断
if (Thread.interrupted()) {
breakBarrier(); // 破坏屏障(设置g.broken=true,唤醒所有线程)
throw new InterruptedException();
}
// 5. 减少等待线程数(count--)
int index = --count;
if (index == 0) { // 6. 最后一个线程到达(触发屏障)
boolean ranAction = false;
try {
// 执行屏障动作(由最后一个线程执行)
final Runnable command = barrierCommand;
if (command != null) {
command.run();
}
ranAction = true;
// 进入下一轮屏障(重置count、生成新代)
nextGeneration();
return 0; // 返回0,表示当前线程是最后一个到达的
} finally {
// 如果屏障动作抛出异常,破坏屏障(避免其他线程无限等待)
if (!ranAction) {
breakBarrier();
}
}
}
// 7. 非最后一个线程:循环等待(直到被唤醒、中断、超时或屏障被破坏)
for (;;) {
try {
// 根据是否超时,选择等待方式
if (!timed) {
trip.await(); // 无限等待
} else if (nanos > 0L) {
nanos = trip.awaitNanos(nanos); // 超时等待
}
} catch (InterruptedException ie) {
// 8. 等待过程中被中断
if (g == generation && !g.broken) {
breakBarrier(); // 破坏屏障
throw ie; // 抛出中断异常
} else {
// 已经是新的代或屏障已破坏,只需重新中断当前线程
Thread.currentThread().interrupt();
}
}
// 9. 唤醒后检查屏障状态
if (g.broken) {
throw new BrokenBarrierException(); // 屏障被破坏,抛出异常
}
if (g != generation) {
return index; // 进入新的代(屏障已触发),返回当前线程的到达顺序
}
// 10. 超时处理
if (timed && nanos <= 0L) {
breakBarrier(); // 破坏屏障
throw new TimeoutException(); // 抛出超时异常
}
}
} finally {
lock.unlock(); // 释放锁
}
}
4. 关键辅助方法
nextGeneration()
:触发屏障后,重置状态以进入下一轮。javaprivate void nextGeneration() { trip.signalAll(); // 唤醒所有等待的线程 count = parties; // 重置count为初始值 generation = new Generation(); // 生成新的代(循环性的关键) }
breakBarrier()
:破坏屏障(如线程中断、超时),唤醒所有线程并标记当前代为broken
。javaprivate void breakBarrier() { generation.broken = true; // 标记当前代被破坏 count = parties; // 重置count(避免后续线程无限等待) trip.signalAll(); // 唤醒所有等待的线程 }
reset()
:手动重置屏障到初始状态(会破坏当前代,唤醒所有等待线程)。javapublic void reset() { final ReentrantLock lock = this.lock; lock.lock(); try { breakBarrier(); // 破坏当前代 nextGeneration(); // 生成新的代 } finally { lock.unlock(); } }
四、CyclicBarrier 与 CountDownLatch 的区别
特性 | CyclicBarrier | CountDownLatch |
---|---|---|
循环性 | 可循环使用(nextGeneration() ) | 一次性(count减到0后无法重置) |
等待逻辑 | 所有线程互相等待(均需到达屏障) | 一个/多个线程等待其他线程完成 |
屏障动作 | 有(最后一个线程执行) | 无 |
count 变化 | 由参与线程调用await() 递减 | 由其他线程调用countDown() 递减 |
异常处理 | 破坏屏障(broken 标记),所有线程抛出BrokenBarrierException | 无特殊异常处理(中断会抛出InterruptedException ) |
五、常见问题与注意事项
屏障动作的异常处理:
屏障动作(barrierCommand
)如果抛出异常,会导致breakBarrier()
被调用,所有线程抛出BrokenBarrierException
。因此,屏障动作中的代码必须捕获异常或确保无异常。线程中断的影响:
任何线程在await()
期间被中断,都会触发breakBarrier()
,导致所有线程抛出BrokenBarrierException
。如果需要处理中断,应在catch (InterruptedException)
中手动处理。超时的影响:
使用await(timeout, unit)
时,若超时,会触发breakBarrier()
,所有线程抛出TimeoutException
或BrokenBarrierException
。需合理设置超时时间,避免不必要的屏障破坏。parties 的取值:
parties
必须大于0,否则构造函数抛出IllegalArgumentException
。
六、使用场景总结
CyclicBarrier
适用于需要多个线程同步到达某个点后再继续执行的场景,典型案例包括:
多线程数据合并:如分布式计算中,多个线程计算数据片段,等待所有线程完成后合并结果。
并发测试:如模拟100个用户同时登录,确保所有用户线程同时开始执行登录逻辑。
游戏关卡加载:如多人游戏中,所有玩家加载完当前关卡后,才能进入下一关。
七、总结
CyclicBarrier
是一个循环性、互相等待的同步工具,其核心是通过ReentrantLock
和Condition
实现线程的等待/通知,通过Generation
实现循环性。深入理解其源码中的代机制、屏障动作和异常处理,能帮助我们更好地使用它,避免出现同步问题。
与CountDownLatch
相比,CyclicBarrier
更适合需要重复同步的场景,而CountDownLatch
更适合一次性等待的场景。在实际开发中,需根据具体需求选择合适的同步工具。