Skip to content

Java Semaphore 详解与源码剖析

一、Semaphore 概述

Semaphore(信号量)是 Java 并发包 java.util.concurrent 中的核心工具类,用于控制同时访问特定资源的线程数量。它通过协调多个线程,实现对共享资源的合理分配,避免因资源竞争导致的并发问题。

核心作用

  • 限流:限制并发访问资源的线程数(如数据库连接池、线程池资源控制)。
  • 资源池管理:控制可重复使用资源的并发访问量(如连接池中的连接数)。
  • 并发协作:通过动态调整许可数,实现线程间的复杂协作逻辑。

与其他并发工具的区别

工具类核心功能特点对比
Semaphore控制资源并发访问数量许可可动态增减,支持公平/非公平模式
CountDownLatch等待多个线程完成后继续执行计数器只能递减至 0,不可重置
CyclicBarrier多个线程互相等待至屏障点后继续所有线程到达后可重置屏障,适用于循环任务

二、Semaphore 基本使用

Semaphore 的使用流程通常为:创建实例(指定许可数)→ 线程获取许可(acquire)→ 访问资源 → 线程释放许可(release

1. 核心 API

方法签名功能描述
Semaphore(int permits)创建非公平信号量,初始许可数为 permits
Semaphore(int permits, boolean fair)创建信号量,fair=true 时为公平模式,按线程等待顺序分配许可
void acquire() throws InterruptedException获取 1 个许可,若无可用许可则阻塞,支持中断
void acquire(int permits)获取 permits 个许可,同上
boolean tryAcquire()尝试获取 1 个许可,立即返回(成功 true/失败 false),不阻塞
boolean tryAcquire(long timeout, TimeUnit unit)在超时时间内尝试获取 1 个许可,支持中断
void release()释放 1 个许可(归还信号量)
void release(int permits)释放 permits 个许可
int availablePermits()返回当前可用许可数
int drainPermits()获取并清空所有可用许可(返回获取的许可数)

2. 使用示例:限流控制

以下示例模拟 10 个线程竞争 3 个许可(即最多 3 个线程同时访问资源):

java
import java.util.concurrent.Semaphore;

public class SemaphoreDemo {
    // 创建非公平信号量,初始许可数为 3
    private static final Semaphore semaphore = new Semaphore(3);

    public static void main(String[] args) {
        // 创建 10 个线程模拟并发访问
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                try {
                    // 获取许可(若无许可则阻塞)
                    semaphore.acquire();
                    System.out.println(Thread.currentThread().getName() + " 获取许可,开始访问资源");
                    Thread.sleep(1000); // 模拟资源访问耗时
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } finally {
                    // 释放许可(必须在 finally 中执行,避免许可泄露)
                    semaphore.release();
                    System.out.println(Thread.currentThread().getName() + " 释放许可,当前可用许可:" + semaphore.availablePermits());
                }
            }, "Thread-" + i).start();
        }
    }
}

输出结果(片段):

Thread-0 获取许可,开始访问资源
Thread-1 获取许可,开始访问资源
Thread-2 获取许可,开始访问资源
Thread-0 释放许可,当前可用许可:1
Thread-3 获取许可,开始访问资源
Thread-1 释放许可,当前可用许可:1
Thread-4 获取许可,开始访问资源
...

说明:最多 3 个线程同时获取许可,释放后其他线程才能继续获取,实现了限流效果。

三、Semaphore 源码结构剖析

Semaphore 基于 AQS(AbstractQueuedSynchronizer) 实现,核心逻辑封装在内部同步器 Sync 中,并通过 FairSync(公平模式)和 NonfairSync(非公平模式)两个子类实现不同的许可分配策略。

1. 类结构总览

java
public class Semaphore implements java.io.Serializable {
    private final Sync sync; // 核心同步器,继承 AQS

    // 抽象同步器基类(继承 AQS)
    abstract static class Sync extends AbstractQueuedSynchronizer { ... }

    // 非公平模式同步器
    static final class NonfairSync extends Sync { ... }

    // 公平模式同步器
    static final class FairSync extends Sync { ... }

    // 构造方法
    public Semaphore(int permits) { sync = new NonfairSync(permits); }
    public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); }

    // 核心方法(acquire/release 等)
    public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
    public void release() { sync.releaseShared(1); }
    ...
}

2. 核心同步器 Sync

Sync 继承 AQS,并复用 AQS 的 state 变量表示当前可用许可数。核心方法包括许可的获取(tryAcquireShared)、释放(tryReleaseShared)及辅助操作。

2.1 Sync 构造方法

java
abstract static class Sync extends AbstractQueuedSynchronizer {
    Sync(int permits) {
        setState(permits); // 初始化 AQS 的 state 为许可数
    }

    final int getPermits() {
        return getState(); // 返回当前可用许可数
    }
    ...
}
  • state 含义:AQS 的 state 被复用为 Semaphore 的“可用许可数”,初始值由构造方法传入的 permits 决定。

2.2 非公平模式:NonfairSync

非公平模式下,新线程会优先尝试抢占许可,即使等待队列中已有线程排队。核心逻辑在 tryAcquireShared 方法:

java
static final class NonfairSync extends Sync {
    NonfairSync(int permits) {
        super(permits);
    }

    // 尝试获取共享许可(AQS 共享模式核心方法)
    protected int tryAcquireShared(int acquires) {
        return nonfairTryAcquireShared(acquires); // 调用父类的非公平获取逻辑
    }
}

// Sync 类中的非公平获取实现
final int nonfairTryAcquireShared(int acquires) {
    for (;;) { // 自旋 CAS
        int available = getState(); // 当前可用许可数
        int remaining = available - acquires; // 获取后剩余许可数
        // 若剩余许可 <0,直接返回负数(获取失败);否则 CAS 尝试更新 state
        if (remaining < 0 || compareAndSetState(available, remaining)) {
            return remaining; // 返回剩余许可数(>=0 表示成功)
        }
    }
}

逻辑解析

  1. 直接计算剩余许可数 remaining = available - acquires
  2. remaining >=0,通过 CAS 原子更新 stateremaining,成功则返回 remaining(获取成功);
  3. 若 CAS 失败(并发竞争),自旋重试;若 remaining <0,返回负数(获取失败,进入等待队列)。

2.3 公平模式:FairSync

公平模式下,线程需严格按等待队列顺序获取许可(FIFO),新线程不会抢占已有排队线程的许可。核心逻辑在 tryAcquireShared 方法:

java
static final class FairSync extends Sync {
    FairSync(int permits) {
        super(permits);
    }

    protected int tryAcquireShared(int acquires) {
        for (;;) {
            // 公平模式核心:先检查等待队列中是否有前驱线程(即是否有线程排队)
            if (hasQueuedPredecessors()) {
                return -1; // 有前驱线程,当前线程需排队,返回失败
            }
            // 无排队线程,再尝试 CAS 获取许可(逻辑同非公平模式)
            int available = getState();
            int remaining = available - acquires;
            if (remaining < 0 || compareAndSetState(available, remaining)) {
                return remaining;
            }
        }
    }
}

关键区别
公平模式比非公平模式多了 hasQueuedPredecessors() 检查。该方法由 AQS 提供,用于判断“当前线程是否需要排队”(即等待队列中是否有比当前线程更早等待的线程)。若有,则当前线程直接返回失败,进入队列尾部等待。

四、核心方法源码深度解析

Semaphore 的核心功能通过 acquire(获取许可)和 release(释放许可)实现,两者均基于 AQS 的共享模式 API。

1. acquire():获取许可

acquire() 用于获取 1 个许可,若当前无可用许可,线程会阻塞并进入 AQS 等待队列,支持中断。

java
public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1); // 调用 AQS 的共享可中断获取方法
}

AQS 的 acquireSharedInterruptibly 流程

AQS 中 acquireSharedInterruptibly 的核心逻辑(简化版):

java
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
    if (Thread.interrupted()) { // 检查中断状态
        throw new InterruptedException();
    }
    // 调用 Semaphore 同步器的 tryAcquireShared 尝试获取许可
    if (tryAcquireShared(arg) < 0) { 
        doAcquireSharedInterruptibly(arg); // 获取失败,进入等待队列阻塞
    }
}
  • 步骤
    1. 检查线程中断状态,若中断则抛出 InterruptedException
    2. 调用 tryAcquireShared(arg)(由 NonfairSyncFairSync 实现)尝试获取许可:
      • 若返回值 >=0:获取成功,直接返回;
      • 若返回值 <0:获取失败,调用 doAcquireSharedInterruptibly 将线程加入 AQS 等待队列并阻塞。

2. release():释放许可

release() 用于释放 1 个许可(归还信号量),可能唤醒等待队列中的线程。

java
public void release() {
    sync.releaseShared(1); // 调用 AQS 的共享释放方法
}

AQS 的 releaseShared 流程

AQS 中 releaseShared 的核心逻辑(简化版):

java
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) { // 调用 Semaphore 同步器的 tryReleaseShared 释放许可
        doReleaseShared(); // 释放成功,唤醒等待队列中的线程
        return true;
    }
    return false;
}

tryReleaseShared 实现(Sync 类中)

java
protected final boolean tryReleaseShared(int releases) {
    for (;;) { // 自旋 CAS
        int current = getState(); // 当前许可数
        int next = current + releases; // 释放后许可数(releases 为释放的许可数)
        if (next < current) { // 溢出检查(release 过多可能导致 next 溢出为负数)
            throw new Error("Maximum permit count exceeded");
        }
        // CAS 更新 state 为 next,成功则返回 true(释放成功)
        if (compareAndSetState(current, next)) {
            return true;
        }
    }
}

逻辑解析
释放许可本质是增加 state(可用许可数),通过自旋 CAS 保证原子性。释放成功后,AQS 的 doReleaseShared 会唤醒等待队列中的后续线程,让它们重新尝试获取许可。

3. 其他重要方法

drainPermits():清空所有许可

java
public int drainPermits() {
    return sync.drainPermits();
}

// Sync 类中实现
final int drainPermits() {
    for (;;) {
        int current = getState();
        if (current == 0 || compareAndSetState(current, 0)) {
            return current; // 返回并清空所有许可
        }
    }
}

reducePermits(int reduction):减少许可数

java
protected void reducePermits(int reduction) {
    if (reduction < 0) throw new IllegalArgumentException();
    sync.reducePermits(reduction);
}

// Sync 类中实现
final void reducePermits(int reductions) {
    for (;;) {
        int current = getState();
        int next = current - reductions;
        if (next > current) { // underflow
            throw new Error("Permit count underflow");
        }
        if (compareAndSetState(current, next)) {
            return;
        }
    }
}
  • 注意reducePermitsprotected 方法,需通过子类调用,用于动态减少许可数(如资源池缩容)。

五、公平与非公平模式的性能对比

  • 非公平模式:新线程会优先抢占许可,可能导致等待队列中的线程长期饥饿,但吞吐量更高(减少线程切换开销)。
  • 公平模式:严格按 FIFO 顺序分配许可,避免饥饿,但需频繁检查等待队列(hasQueuedPredecessors()),性能略低。

建议:无特殊公平性要求时,优先使用非公平模式(默认)。

六、注意事项

  1. 许可释放必须配对acquire 后必须在 finally 中调用 release,否则会导致许可泄露(信号量永久减少)。
  2. 避免过度释放release 的许可数若超过 acquire 的数量,会导致 state 持续增加(可能溢出)。
  3. 中断处理acquire() 支持中断,需捕获 InterruptedException 并处理(如恢复中断状态或终止任务)。
  4. 动态调整许可:通过 release(增加)和 reducePermits(减少)可动态调整许可数,但需注意线程安全。

七、总结

Semaphore 是基于 AQS 共享模式实现的限流工具,通过 state 变量维护可用许可数,核心逻辑为:

  • 获取许可:通过 tryAcquireShared 减少 state,失败则进入 AQS 等待队列;
  • 释放许可:通过 tryReleaseShared 增加 state,成功则唤醒等待线程。

其公平/非公平模式的区别在于是否尊重等待队列顺序,适用于资源池控制、限流等场景。理解 Semaphore 的源码实现,需深入掌握 AQS 的共享模式及同步队列机制。

通过以上剖析,相信你已对 Semaphore 的原理和使用有了全面理解。实际开发中,需结合业务场景选择合适的模式,并注意许可的正确获取与释放,避免并发问题。