V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
ProgrammerAlan
V2EX  ›  程序员

[线程同步工具] Semaphore 源码解析

  •  
  •   ProgrammerAlan · 2023-02-20 17:01:13 +08:00 · 296 次点击
    这是一个创建于 602 天前的主题,其中的信息可能已经有所发展或是发生改变。

    控制对资源的一个或多个副本的并发访问

    Java API 提供了一种信号量机制 Semaphore 。 一个信号量就是一个计数器, 可用于保护对一个或多个共享资源的访问。

    当一个线程要访问多个共享资源中的一个时,它首先需要获得一个信号量。如果信号量内部的计数器的值大于 0 ,那么信号量就递减计数器并允许线程访问。计数器的值大于 0 意味着存在可用的空闲资源,所以线程能够访问并使用这些资源中的一个。 如果计数器的值为 0 ,信号量会让线程休眠,直到计数器的值大于 0 。计数器的值为 0 意味着所有共享资源都被其他线程占用了,所以当前想要使用资源的线程,必须等待其中一个资源被释放 。

    Semaphore 源码分析

    Sync 类

    /**
     * Synchronization implementation for semaphore.  Uses AQS state
     * to represent permits. Subclassed into fair and nonfair
     * versions.
     */
    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 1192457210091910933L;
    
        Sync(int permits) {
            setState(permits);
        }
    
        final int getPermits() {
            return getState();
        }
    
        final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
    
        protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))
                    return true;
            }
        }
    
        final void reducePermits(int reductions) {
            for (;;) {
                int current = getState();
                int next = current - reductions;
                if (next > current) // underflow
                    throw new Error("Permit count underflow");
                if (compareAndSetState(current, next))
                    return;
            }
        }
    
        final int drainPermits() {
            for (;;) {
                int current = getState();
                if (current == 0 || compareAndSetState(current, 0))
                    return current;
            }
        }
    }
    

    这是 Semaphore 中定义的一个抽象内部类 Sync ,用于实现信号量的同步机制。该类继承了 AbstractQueuedSynchronizer ,并重写了其中的一些方法,同时提供了一些新的方法来实现 Semaphore 的不同操作。

    其中,Sync 类有一个整型变量 state ,用于表示当前 Semaphore 中的可用许可数量。每当一个线程获取了一个许可时,state 的值减 1 ,每当一个线程释放了一个许可时,state 的值加 1 。

    Sync 类中定义了以下方法:

    • 构造方法 Sync(int permits):构造一个具有指定许可数的 Sync 实例,将许可数存储在 state 中。
    • getPermits():获取当前 Semaphore 中可用的许可数量。
    • nonfairTryAcquireShared(int acquires):非公平模式下尝试获取指定数量的许可。如果当前可用的许可数量不足,则返回负数;否则原子减少 state 的值并返回剩余许可数。
    • tryReleaseShared(int releases):尝试释放指定数量的许可,如果释放成功则返回 true ,否则返回 false 。
    • reducePermits(int reductions):减少 Semaphore 中可用的许可数量,该操作不可撤回。如果减少的数量大于当前可用的许可数量,将抛出异常。
    • drainPermits():返回当前 Semaphore 中可用的所有许可,并将 state 值原子更新为 0 。如果当前没有可用的许可,返回值为 0 。

    其中,许可的获取和释放操作是通过对 state 进行原子操作来实现的。

    在代码实现中,nonfairTryAcquireShared() 方法使用了自旋操作,不断检查 state 的值,直到当前线程成功获取许可或者其他线程释放许可后,才退出自旋。

    reducePermits() 方法是用于减少 Semaphore 许可数量的,它使用自旋方式将当前 state 值减去指定数量的许可,并且对减去后的结果进行检查,以确保结果值不会小于 0 。

    在减少许可的时候,也使用了原子操作,以保证多个线程同时调用 reducePermits() 方法时不会导致并发问题。

    FairSync 类

    /**
     * Fair version
     */
    static final class FairSync extends Sync {
        private static final long serialVersionUID = 2014338818796000944L;
    
        FairSync(int permits) {
            super(permits);
        }
    
        protected int tryAcquireShared(int acquires) {
            for (;;) {
                if (hasQueuedPredecessors())
                    return -1;
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
    }
    

    这段代码用于公平模式下获取许可证。先检测前面是否有排队的,如果有排队的则获取许可失败,进入队列排队,否则尝试原子更新 state 的值。

    hasQueuedPredecessors ()判断当前线程是否在等待队列中。

    public final boolean hasQueuedPredecessors() {
        // The correctness of this depends on head being initialized
        // before tail and on head.next being accurate if the current
        // thread is first in queue.
        Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        Node s;
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }
    

    这段代码的实现思路比较复杂,我们先来了解一点 AQS 队列的结构和细节。在 AQS 队列中,每个等待线程都被包装成一个 Node 对象,这些 Node 对象通过 next 指针形成了一个 FIFO (先进先出)队列。Node 节点中包含了线程本身以及一些状态信息,如前继节点、后继节点等。等待队列的头结点( head )是当前持有许可证的线程所对应的节点,等待队列的尾节点( tail )是最后一个正在等待的线程所对应的节点。

    再来接着分析 hasQueuedPredecessors()的实现逻辑,首先它会先获取等待队列中的头节点和尾节点,然后判断头节点和尾节点是否相同。如果相同,说明当前线程是第一个在等待队列中的线程,返回 false 。否则,获取头节点的下一个节点,判断下一个节点的线程是否为当前线程,如果是,则说明当前线程在等待队列中,返回 true ,否则返回 false 。

    Semaphore 用法

    使用 Semaphore 实现限流

    限流是一种在分布式系统中常用的流量控制方式,可以防止因流量过大而导致的系统崩溃、服务降级等问题。一般情况下,限流是通过限制并发请求、请求速率等方式来实现的,比如使用令牌桶、漏桶等算法。在网关层进行限流可以减轻后端服务的压力,保证服务的可用性和稳定性。而在某些场景下,也可以在应用程序中自己实现限流,比如秒杀场景中限制并发请求的数量,避免过多的请求导致系统崩溃。

    下面我们使用 Semaphore 实现一个简单的限流功能:

    public class SemaphoreTest {
        public static final Semaphore SEMAPHORE = new Semaphore(100);
        public static final AtomicInteger failCount = new AtomicInteger(0);
        public static final AtomicInteger successCount = new AtomicInteger(0);
        public static void main(String[] args) {
            for (int i = 0; i < 1000; i++) {
                new Thread(()->seckill()).start();
            }
        }
        public static boolean seckill() {
            if (!SEMAPHORE.tryAcquire()) {
                System.out.println("no permits, count="+failCount.incrementAndGet());
                return false;
            }
            try {
                // 处理业务逻辑
                Thread.sleep(2000);
                System.out.println("seckill success, count="+successCount.incrementAndGet());
            } catch (InterruptedException e) {
                // todo 处理异常
                e.printStackTrace();
            } finally {
                SEMAPHORE.release();
            }
            return true;
        }
    }
    

    作者简介

    鑫茂,深圳,Java 开发工程师,2022 年 3 月参加工作。

    喜读思维方法、哲学心理学以及历史等方面的书,偶尔写些文字。

    希望通过文章,结识更多同道中人。

    目前尚无回复
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   997 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 22ms · UTC 22:13 · PVG 06:13 · LAX 15:13 · JFK 18:13
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.