[线程同步工具] Semaphore 源码解析

2023-02-20 17:01:13 +08:00
 ProgrammerAlan

控制对资源的一个或多个副本的并发访问

Java API 提供了一种信号量机制 Semaphore 。 一个信号量就是一个计数器, 可用于保护对一个或多个共享资源的访问。

当一个线程要访问多个共享资源中的一个时,它首先需要获得一个信号量。如果信号量内部的计数器的值大于 0 ,那么信号量就递减计数器并允许线程访问。计数器的值大于 0 意味着存在可用的空闲资源,所以线程能够访问并使用这些资源中的一个。 如果计数器的值为 0 ,信号量会让线程休眠,直到计数器的值大于 0 。计数器的值为 0 意味着所有共享资源都被其他线程占用了,所以当前想要使用资源的线程,必须等待其中一个资源被释放 。

Semaphore 源码分析

Sync 类

/**
 * Synchronization implementation for semaphore.  Uses AQS state
 * to represent permits. Subclassed into fair and nonfair
 * versions.
 */
abstract static class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 1192457210091910933L;

    Sync(int permits) {
        setState(permits);
    }

    final int getPermits() {
        return getState();
    }

    final int nonfairTryAcquireShared(int acquires) {
        for (;;) {
            int available = getState();
            int remaining = available - acquires;
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                return remaining;
        }
    }

    protected final boolean tryReleaseShared(int releases) {
        for (;;) {
            int current = getState();
            int next = current + releases;
            if (next < current) // overflow
                throw new Error("Maximum permit count exceeded");
            if (compareAndSetState(current, next))
                return true;
        }
    }

    final void reducePermits(int reductions) {
        for (;;) {
            int current = getState();
            int next = current - reductions;
            if (next > current) // underflow
                throw new Error("Permit count underflow");
            if (compareAndSetState(current, next))
                return;
        }
    }

    final int drainPermits() {
        for (;;) {
            int current = getState();
            if (current == 0 || compareAndSetState(current, 0))
                return current;
        }
    }
}

这是 Semaphore 中定义的一个抽象内部类 Sync ,用于实现信号量的同步机制。该类继承了 AbstractQueuedSynchronizer ,并重写了其中的一些方法,同时提供了一些新的方法来实现 Semaphore 的不同操作。

其中,Sync 类有一个整型变量 state ,用于表示当前 Semaphore 中的可用许可数量。每当一个线程获取了一个许可时,state 的值减 1 ,每当一个线程释放了一个许可时,state 的值加 1 。

Sync 类中定义了以下方法:

其中,许可的获取和释放操作是通过对 state 进行原子操作来实现的。

在代码实现中,nonfairTryAcquireShared() 方法使用了自旋操作,不断检查 state 的值,直到当前线程成功获取许可或者其他线程释放许可后,才退出自旋。

reducePermits() 方法是用于减少 Semaphore 许可数量的,它使用自旋方式将当前 state 值减去指定数量的许可,并且对减去后的结果进行检查,以确保结果值不会小于 0 。

在减少许可的时候,也使用了原子操作,以保证多个线程同时调用 reducePermits() 方法时不会导致并发问题。

FairSync 类

/**
 * Fair version
 */
static final class FairSync extends Sync {
    private static final long serialVersionUID = 2014338818796000944L;

    FairSync(int permits) {
        super(permits);
    }

    protected int tryAcquireShared(int acquires) {
        for (;;) {
            if (hasQueuedPredecessors())
                return -1;
            int available = getState();
            int remaining = available - acquires;
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                return remaining;
        }
    }
}

这段代码用于公平模式下获取许可证。先检测前面是否有排队的,如果有排队的则获取许可失败,进入队列排队,否则尝试原子更新 state 的值。

hasQueuedPredecessors ()判断当前线程是否在等待队列中。

public final boolean hasQueuedPredecessors() {
    // The correctness of this depends on head being initialized
    // before tail and on head.next being accurate if the current
    // thread is first in queue.
    Node t = tail; // Read fields in reverse initialization order
    Node h = head;
    Node s;
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}

这段代码的实现思路比较复杂,我们先来了解一点 AQS 队列的结构和细节。在 AQS 队列中,每个等待线程都被包装成一个 Node 对象,这些 Node 对象通过 next 指针形成了一个 FIFO (先进先出)队列。Node 节点中包含了线程本身以及一些状态信息,如前继节点、后继节点等。等待队列的头结点( head )是当前持有许可证的线程所对应的节点,等待队列的尾节点( tail )是最后一个正在等待的线程所对应的节点。

再来接着分析 hasQueuedPredecessors()的实现逻辑,首先它会先获取等待队列中的头节点和尾节点,然后判断头节点和尾节点是否相同。如果相同,说明当前线程是第一个在等待队列中的线程,返回 false 。否则,获取头节点的下一个节点,判断下一个节点的线程是否为当前线程,如果是,则说明当前线程在等待队列中,返回 true ,否则返回 false 。

Semaphore 用法

使用 Semaphore 实现限流

限流是一种在分布式系统中常用的流量控制方式,可以防止因流量过大而导致的系统崩溃、服务降级等问题。一般情况下,限流是通过限制并发请求、请求速率等方式来实现的,比如使用令牌桶、漏桶等算法。在网关层进行限流可以减轻后端服务的压力,保证服务的可用性和稳定性。而在某些场景下,也可以在应用程序中自己实现限流,比如秒杀场景中限制并发请求的数量,避免过多的请求导致系统崩溃。

下面我们使用 Semaphore 实现一个简单的限流功能:

public class SemaphoreTest {
    public static final Semaphore SEMAPHORE = new Semaphore(100);
    public static final AtomicInteger failCount = new AtomicInteger(0);
    public static final AtomicInteger successCount = new AtomicInteger(0);
    public static void main(String[] args) {
        for (int i = 0; i < 1000; i++) {
            new Thread(()->seckill()).start();
        }
    }
    public static boolean seckill() {
        if (!SEMAPHORE.tryAcquire()) {
            System.out.println("no permits, count="+failCount.incrementAndGet());
            return false;
        }
        try {
            // 处理业务逻辑
            Thread.sleep(2000);
            System.out.println("seckill success, count="+successCount.incrementAndGet());
        } catch (InterruptedException e) {
            // todo 处理异常
            e.printStackTrace();
        } finally {
            SEMAPHORE.release();
        }
        return true;
    }
}

作者简介

鑫茂,深圳,Java 开发工程师,2022 年 3 月参加工作。

喜读思维方法、哲学心理学以及历史等方面的书,偶尔写些文字。

希望通过文章,结识更多同道中人。

296 次点击
所在节点    程序员
0 条回复

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/917679

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX