Appearance
Java Semaphore 详解与源码剖析
一、Semaphore 概述
Semaphore
(信号量)是 Java 并发包 java.util.concurrent
中的核心工具类,用于控制同时访问特定资源的线程数量。它通过协调多个线程,实现对共享资源的合理分配,避免因资源竞争导致的并发问题。
核心作用
- 限流:限制并发访问资源的线程数(如数据库连接池、线程池资源控制)。
- 资源池管理:控制可重复使用资源的并发访问量(如连接池中的连接数)。
- 并发协作:通过动态调整许可数,实现线程间的复杂协作逻辑。
与其他并发工具的区别
工具类 | 核心功能 | 特点对比 |
---|---|---|
Semaphore | 控制资源并发访问数量 | 许可可动态增减,支持公平/非公平模式 |
CountDownLatch | 等待多个线程完成后继续执行 | 计数器只能递减至 0,不可重置 |
CyclicBarrier | 多个线程互相等待至屏障点后继续 | 所有线程到达后可重置屏障,适用于循环任务 |
二、Semaphore 基本使用
Semaphore
的使用流程通常为:创建实例(指定许可数)→ 线程获取许可(acquire
)→ 访问资源 → 线程释放许可(release
)。
1. 核心 API
方法签名 | 功能描述 |
---|---|
Semaphore(int permits) | 创建非公平信号量,初始许可数为 permits |
Semaphore(int permits, boolean fair) | 创建信号量,fair=true 时为公平模式,按线程等待顺序分配许可 |
void acquire() throws InterruptedException | 获取 1 个许可,若无可用许可则阻塞,支持中断 |
void acquire(int permits) | 获取 permits 个许可,同上 |
boolean tryAcquire() | 尝试获取 1 个许可,立即返回(成功 true /失败 false ),不阻塞 |
boolean tryAcquire(long timeout, TimeUnit unit) | 在超时时间内尝试获取 1 个许可,支持中断 |
void release() | 释放 1 个许可(归还信号量) |
void release(int permits) | 释放 permits 个许可 |
int availablePermits() | 返回当前可用许可数 |
int drainPermits() | 获取并清空所有可用许可(返回获取的许可数) |
2. 使用示例:限流控制
以下示例模拟 10 个线程竞争 3 个许可(即最多 3 个线程同时访问资源):
java
import java.util.concurrent.Semaphore;
public class SemaphoreDemo {
// 创建非公平信号量,初始许可数为 3
private static final Semaphore semaphore = new Semaphore(3);
public static void main(String[] args) {
// 创建 10 个线程模拟并发访问
for (int i = 0; i < 10; i++) {
new Thread(() -> {
try {
// 获取许可(若无许可则阻塞)
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + " 获取许可,开始访问资源");
Thread.sleep(1000); // 模拟资源访问耗时
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
// 释放许可(必须在 finally 中执行,避免许可泄露)
semaphore.release();
System.out.println(Thread.currentThread().getName() + " 释放许可,当前可用许可:" + semaphore.availablePermits());
}
}, "Thread-" + i).start();
}
}
}
输出结果(片段):
Thread-0 获取许可,开始访问资源
Thread-1 获取许可,开始访问资源
Thread-2 获取许可,开始访问资源
Thread-0 释放许可,当前可用许可:1
Thread-3 获取许可,开始访问资源
Thread-1 释放许可,当前可用许可:1
Thread-4 获取许可,开始访问资源
...
说明:最多 3 个线程同时获取许可,释放后其他线程才能继续获取,实现了限流效果。
三、Semaphore 源码结构剖析
Semaphore
基于 AQS(AbstractQueuedSynchronizer) 实现,核心逻辑封装在内部同步器 Sync
中,并通过 FairSync
(公平模式)和 NonfairSync
(非公平模式)两个子类实现不同的许可分配策略。
1. 类结构总览
java
public class Semaphore implements java.io.Serializable {
private final Sync sync; // 核心同步器,继承 AQS
// 抽象同步器基类(继承 AQS)
abstract static class Sync extends AbstractQueuedSynchronizer { ... }
// 非公平模式同步器
static final class NonfairSync extends Sync { ... }
// 公平模式同步器
static final class FairSync extends Sync { ... }
// 构造方法
public Semaphore(int permits) { sync = new NonfairSync(permits); }
public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); }
// 核心方法(acquire/release 等)
public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
public void release() { sync.releaseShared(1); }
...
}
2. 核心同步器 Sync
Sync
继承 AQS
,并复用 AQS 的 state 变量表示当前可用许可数。核心方法包括许可的获取(tryAcquireShared
)、释放(tryReleaseShared
)及辅助操作。
2.1 Sync
构造方法
java
abstract static class Sync extends AbstractQueuedSynchronizer {
Sync(int permits) {
setState(permits); // 初始化 AQS 的 state 为许可数
}
final int getPermits() {
return getState(); // 返回当前可用许可数
}
...
}
- state 含义:AQS 的
state
被复用为 Semaphore 的“可用许可数”,初始值由构造方法传入的permits
决定。
2.2 非公平模式:NonfairSync
非公平模式下,新线程会优先尝试抢占许可,即使等待队列中已有线程排队。核心逻辑在 tryAcquireShared
方法:
java
static final class NonfairSync extends Sync {
NonfairSync(int permits) {
super(permits);
}
// 尝试获取共享许可(AQS 共享模式核心方法)
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires); // 调用父类的非公平获取逻辑
}
}
// Sync 类中的非公平获取实现
final int nonfairTryAcquireShared(int acquires) {
for (;;) { // 自旋 CAS
int available = getState(); // 当前可用许可数
int remaining = available - acquires; // 获取后剩余许可数
// 若剩余许可 <0,直接返回负数(获取失败);否则 CAS 尝试更新 state
if (remaining < 0 || compareAndSetState(available, remaining)) {
return remaining; // 返回剩余许可数(>=0 表示成功)
}
}
}
逻辑解析:
- 直接计算剩余许可数
remaining = available - acquires
; - 若
remaining >=0
,通过 CAS 原子更新state
为remaining
,成功则返回remaining
(获取成功); - 若 CAS 失败(并发竞争),自旋重试;若
remaining <0
,返回负数(获取失败,进入等待队列)。
2.3 公平模式:FairSync
公平模式下,线程需严格按等待队列顺序获取许可(FIFO),新线程不会抢占已有排队线程的许可。核心逻辑在 tryAcquireShared
方法:
java
static final class FairSync extends Sync {
FairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
for (;;) {
// 公平模式核心:先检查等待队列中是否有前驱线程(即是否有线程排队)
if (hasQueuedPredecessors()) {
return -1; // 有前驱线程,当前线程需排队,返回失败
}
// 无排队线程,再尝试 CAS 获取许可(逻辑同非公平模式)
int available = getState();
int remaining = available - acquires;
if (remaining < 0 || compareAndSetState(available, remaining)) {
return remaining;
}
}
}
}
关键区别:
公平模式比非公平模式多了 hasQueuedPredecessors()
检查。该方法由 AQS 提供,用于判断“当前线程是否需要排队”(即等待队列中是否有比当前线程更早等待的线程)。若有,则当前线程直接返回失败,进入队列尾部等待。
四、核心方法源码深度解析
Semaphore
的核心功能通过 acquire
(获取许可)和 release
(释放许可)实现,两者均基于 AQS 的共享模式 API。
1. acquire()
:获取许可
acquire()
用于获取 1 个许可,若当前无可用许可,线程会阻塞并进入 AQS 等待队列,支持中断。
java
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1); // 调用 AQS 的共享可中断获取方法
}
AQS 的 acquireSharedInterruptibly
流程
AQS 中 acquireSharedInterruptibly
的核心逻辑(简化版):
java
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted()) { // 检查中断状态
throw new InterruptedException();
}
// 调用 Semaphore 同步器的 tryAcquireShared 尝试获取许可
if (tryAcquireShared(arg) < 0) {
doAcquireSharedInterruptibly(arg); // 获取失败,进入等待队列阻塞
}
}
- 步骤:
- 检查线程中断状态,若中断则抛出
InterruptedException
; - 调用
tryAcquireShared(arg)
(由NonfairSync
或FairSync
实现)尝试获取许可:- 若返回值
>=0
:获取成功,直接返回; - 若返回值
<0
:获取失败,调用doAcquireSharedInterruptibly
将线程加入 AQS 等待队列并阻塞。
- 若返回值
- 检查线程中断状态,若中断则抛出
2. release()
:释放许可
release()
用于释放 1 个许可(归还信号量),可能唤醒等待队列中的线程。
java
public void release() {
sync.releaseShared(1); // 调用 AQS 的共享释放方法
}
AQS 的 releaseShared
流程
AQS 中 releaseShared
的核心逻辑(简化版):
java
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) { // 调用 Semaphore 同步器的 tryReleaseShared 释放许可
doReleaseShared(); // 释放成功,唤醒等待队列中的线程
return true;
}
return false;
}
tryReleaseShared
实现(Sync
类中)
java
protected final boolean tryReleaseShared(int releases) {
for (;;) { // 自旋 CAS
int current = getState(); // 当前许可数
int next = current + releases; // 释放后许可数(releases 为释放的许可数)
if (next < current) { // 溢出检查(release 过多可能导致 next 溢出为负数)
throw new Error("Maximum permit count exceeded");
}
// CAS 更新 state 为 next,成功则返回 true(释放成功)
if (compareAndSetState(current, next)) {
return true;
}
}
}
逻辑解析:
释放许可本质是增加 state
(可用许可数),通过自旋 CAS 保证原子性。释放成功后,AQS 的 doReleaseShared
会唤醒等待队列中的后续线程,让它们重新尝试获取许可。
3. 其他重要方法
drainPermits()
:清空所有许可
java
public int drainPermits() {
return sync.drainPermits();
}
// Sync 类中实现
final int drainPermits() {
for (;;) {
int current = getState();
if (current == 0 || compareAndSetState(current, 0)) {
return current; // 返回并清空所有许可
}
}
}
reducePermits(int reduction)
:减少许可数
java
protected void reducePermits(int reduction) {
if (reduction < 0) throw new IllegalArgumentException();
sync.reducePermits(reduction);
}
// Sync 类中实现
final void reducePermits(int reductions) {
for (;;) {
int current = getState();
int next = current - reductions;
if (next > current) { // underflow
throw new Error("Permit count underflow");
}
if (compareAndSetState(current, next)) {
return;
}
}
}
- 注意:
reducePermits
是protected
方法,需通过子类调用,用于动态减少许可数(如资源池缩容)。
五、公平与非公平模式的性能对比
- 非公平模式:新线程会优先抢占许可,可能导致等待队列中的线程长期饥饿,但吞吐量更高(减少线程切换开销)。
- 公平模式:严格按 FIFO 顺序分配许可,避免饥饿,但需频繁检查等待队列(
hasQueuedPredecessors()
),性能略低。
建议:无特殊公平性要求时,优先使用非公平模式(默认)。
六、注意事项
- 许可释放必须配对:
acquire
后必须在finally
中调用release
,否则会导致许可泄露(信号量永久减少)。 - 避免过度释放:
release
的许可数若超过acquire
的数量,会导致state
持续增加(可能溢出)。 - 中断处理:
acquire()
支持中断,需捕获InterruptedException
并处理(如恢复中断状态或终止任务)。 - 动态调整许可:通过
release
(增加)和reducePermits
(减少)可动态调整许可数,但需注意线程安全。
七、总结
Semaphore
是基于 AQS 共享模式实现的限流工具,通过 state
变量维护可用许可数,核心逻辑为:
- 获取许可:通过
tryAcquireShared
减少state
,失败则进入 AQS 等待队列; - 释放许可:通过
tryReleaseShared
增加state
,成功则唤醒等待线程。
其公平/非公平模式的区别在于是否尊重等待队列顺序,适用于资源池控制、限流等场景。理解 Semaphore
的源码实现,需深入掌握 AQS 的共享模式及同步队列机制。
通过以上剖析,相信你已对 Semaphore
的原理和使用有了全面理解。实际开发中,需结合业务场景选择合适的模式,并注意许可的正确获取与释放,避免并发问题。