动态线程池(DynamicTp),动态调整 Tomcat、Jetty、Undertow 线程池参数篇

2022-04-26 16:28:49 +08:00
 yanhomlin

大家好,这篇文章我们来介绍下动态线程池框架( DynamicTp )的 adapter 模块,上篇文章也大概介绍过了,该模块主要是用来适配一些第三方组件的线程池管理,让第三方组件内置的线程池也能享受到动态参数调整,监控告警这些增强功能。


DynamicTp 项目地址

目前近 1k star ,感谢你的 star ,欢迎 pr ,业务之余给开源贡献一份力量

gitee 地址https://gitee.com/yanhom/dynamic-tp

github 地址https://github.com/lyh200/dynamic-tp


系列文章

美团动态线程池实践思路,开源了

动态线程池框架( DynamicTp ),监控及源码解析篇


adapter 已接入组件

adapter 模块目前已经接入了 SpringBoot 内置的三大 WebServer ( Tomcat 、Jetty 、Undertow )的线程池管理,实现层面也是和核心模块做了解耦,利用 spring 的事件机制进行通知监听处理。

可以看出有两个监听器

  1. 当监听到配置中心配置变更时,在更新我们项目内部线程池后会发布一个 RefreshEvent 事件,DtpWebRefreshListener 监听到该事件后会去更新对应 WebServer 的线程池参数。

  2. 同样监控告警也是如此,在 DtpMonitor 中执行监控任务时会发布 CollectEvent 事件,DtpWebCollectListener 监听到该事件后会去采集相应 WebServer 的线程池指标数据。

要想去管理第三方组件的线程池,首先肯定要对这些组件有一定的熟悉度,了解整个请求的一个处理过程,找到对应处理请求的线程池,这些线程池不一定是 JUC 包下的 ThreadPoolExecutor 类,也可能是组件自己实现的线程池,但是基本原理都差不多。

Tomcat 、Jetty 、Undertow 这三个都是这样,他们并没有直接使用 JUC 提供的线程池实现,而是自己实现了一套,或者扩展了 JUC 的实现;翻源码找到相应的线程池后,然后看有没有暴露 public 方法供我们调用获取,如果没有就需要考虑通过反射来拿了。


Tomcat 内部线程池的实现

1.继承 JUC 原生 ThreadPoolExecutor ( 9.0.50 版本及以下),并覆写了一些方法,主要 execute()和 afterExecute()

2.继承 JUC 的 AbstractExecutorService ( 9.0.51 版本及以上),代码基本是拷贝 JUC 的 ThreadPoolExecutor ,也相应的微调了 execute()方法

注意 Tomcat 实现的线程池类名称也叫 ThreadPoolExecutor ,名字跟 JUC 下的是一样的,Tomcat 的 ThreadPoolExecutor 类 execute()方法如下:

public void execute(Runnable command, long timeout, TimeUnit unit) {
        submittedCount.incrementAndGet();
        try {
            super.execute(command);
        } catch (RejectedExecutionException rx) {
            if (super.getQueue() instanceof TaskQueue) {
                final TaskQueue queue = (TaskQueue)super.getQueue();
                try {
                    if (!queue.force(command, timeout, unit)) {
                        submittedCount.decrementAndGet();
                        throw new RejectedExecutionException(sm.getString("threadPoolExecutor.queueFull"));
                    }
                } catch (InterruptedException x) {
                    submittedCount.decrementAndGet();
                    throw new RejectedExecutionException(x);
                }
            } else {
                submittedCount.decrementAndGet();
                throw rx;
            }

        }
    }

可以看出他是先调用父类的 execute()方法,然后捕获 RejectedExecutionException 异常,再去判断如果任务队列类型是 TaskQueue ,则尝试将任务添加到任务队列中,如果添加失败,证明队列已满,然后再执行拒绝策略,此处 submittedCount 是一个原子变量,记录提交到此线程池但未执行完成的任务数(主要在下面要提到的 TaskQueue 队列的 offer()方法用),为什么要这样设计呢?继续往下看!

 @Override
    public boolean offer(Runnable o) {
        //we can't do any checks
        if (parent==null) return super.offer(o);
        //we are maxed out on threads, simply queue the object
        if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);
        //we have idle threads, just add it to the queue
        if (parent.getSubmittedCount()<=(parent.getPoolSize())) return super.offer(o);
        //if we have less threads than maximum force creation of a new thread
        if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;
        //if we reached here, we need to add it to the queue
        return super.offer(o);
    }

可以看到他在入队之前做了几个判断,这里的 parent 就是所属的线程池对象

1.如果 parent 为 null ,直接调用父类 offer 方法入队

2.如果当前线程数等于最大线程数,则直接调用父类 offer()方法入队

3.如果当前未执行的任务数量小于等于当前线程数,仔细思考下,是不是说明有空闲的线程呢,那么直接调用父类 offer()入队后就马上有线程去执行它

4.如果当前线程数小于最大线程数量,则直接返回 false ,然后回到 JUC 线程池的执行流程回想下,是不是就去添加新线程去执行任务了呢

5.其他情况都直接入队

1.判断如果当前线程数小于核心线程池,则新建一个线程来处理提交的任务

2.如果当前线程数大于核心线程数且队列没满,则将任务放入任务队列等待执行

3.如果当前当前线程池数大于核心线程池,小于最大线程数,且任务队列已满,则创建新的线程执行提交的任务

4.如果当前线程数等于最大线程数,且队列已满,则拒绝该任务

可以看出当当前线程数大于核心线程数时,JUC 原生线程池首先是把任务放到队列里等待执行,而不是先创建线程执行。

如果 Tomcat 接收的请求数量大于核心线程数,请求就会被放到队列中,等待核心线程处理,这样会降低请求的总体处理速度,所以 Tomcat 并没有使用 JUC 原生线程池,利用 TaskQueue 的 offer()方法巧妙的修改了 JUC 线程池的执行流程,改写后 Tomcat 线程池执行流程如下:

1.判断如果当前线程数小于核心线程池,则新建一个线程来处理提交的任务

2.如果当前当前线程池数大于核心线程池,小于最大线程数,则创建新的线程执行提交的任务

3.如果当前线程数等于最大线程数,则将任务放入任务队列等待执行

4.如果队列已满,则执行拒绝策略

    public Executor doGetTp(WebServer webServer) {
        TomcatWebServer tomcatWebServer = (TomcatWebServer) webServer;
        return tomcatWebServer.getTomcat().getConnector().getProtocolHandler().getExecutor();
    }
spring:
  dynamic:
    tp:
      // 其他配置项
      tomcatTp:       # tomcat web server 线程池配置
        minSpare: 100   # 核心线程数
        max: 400        # 最大线程数

Tomcat 线程池就介绍到这里吧,通过以上的一些介绍想必大家对 Tomcat 线程池执行任务的流程都很清楚了吧。


Jetty 内部线程池的实现

public void execute(Runnable job)
    {
        // Determine if we need to start a thread, use and idle thread or just queue this job
        int startThread;
        while (true)
        {
            // Get the atomic counts
            long counts = _counts.get();

            // Get the number of threads started (might not yet be running)
            int threads = AtomicBiInteger.getHi(counts);
            if (threads == Integer.MIN_VALUE)
                throw new RejectedExecutionException(job.toString());

            // Get the number of truly idle threads. This count is reduced by the
            // job queue size so that any threads that are idle but are about to take
            // a job from the queue are not counted.
            int idle = AtomicBiInteger.getLo(counts);

            // Start a thread if we have insufficient idle threads to meet demand
            // and we are not at max threads.
            startThread = (idle <= 0 && threads < _maxThreads) ? 1 : 0;

            // The job will be run by an idle thread when available
            if (!_counts.compareAndSet(counts, threads + startThread, idle + startThread - 1))
                continue;

            break;
        }

        if (!_jobs.offer(job))
        {
            // reverse our changes to _counts.
            if (addCounts(-startThread, 1 - startThread))
                LOG.warn("{} rejected {}", this, job);
            throw new RejectedExecutionException(job.toString());
        }

        if (LOG.isDebugEnabled())
            LOG.debug("queue {} startThread={}", job, startThread);

        // Start a thread if one was needed
        while (startThread-- > 0)
            startThread();
    }
    public Executor doGetTp(WebServer webServer) {
        JettyWebServer jettyWebServer = (JettyWebServer) webServer;
        return jettyWebServer.getServer().getThreadPool();
    }
spring:
  dynamic:
    tp:
      // 其他配置项
      jettyTp:       # jetty web server 线程池配置
        min: 100     # 核心线程数
        max: 400     # 最大线程数

Undertow 内部线程池的实现

    public Executor doGetTp(WebServer webServer) {

        UndertowWebServer undertowWebServer = (UndertowWebServer) webServer;
        Field undertowField = ReflectionUtils.findField(UndertowWebServer.class, "undertow");
        if (Objects.isNull(undertowField)) {
            return null;
        }
        ReflectionUtils.makeAccessible(undertowField);
        Undertow undertow = (Undertow) ReflectionUtils.getField(undertowField, undertowWebServer);
        if (Objects.isNull(undertow)) {
            return null;
        }
        return undertow.getWorker();
    }
spring:
  dynamic:
    tp:
      // 其他配置项
      undertowTp:   # undertow web server 线程池配置
        coreWorkerThreads: 100  # worker 核心线程数
        maxWorkerThreads: 400   # worker 最大线程数
        workerKeepAlive: 60     # 空闲线程超时时间

总结

以上介绍了 Tomcat 、Jetty 、Undertow 三大 WebServer 内置线程池的一些情况,重点介绍了 Tomcat 的,篇幅有限,其他两个感兴趣可以自己分析,原理都差不多。同时也介绍了基于 DynamicTp 怎么动态调整线程池的参数,当我们做 WebServer 性能调优时,能动态调整参数真的是非常好用的。

再次欢迎大家使用 DynamicTp 框架,一起完善项目。

下篇文章打算分享一个 DynamicTp 使用过程中因为 Tomcat 版本不一致导致的监控线程 halt 住的奇葩问题,通过一个问题来掌握 ScheduledExecutorService 的原理,欢迎大家持续关注。


联系我

欢迎加我微信或者关注公众号交流,一起变强!

公众号:CodeFox

微信:yanhom1314

1227 次点击
所在节点    程序员
0 条回复

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/849395

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX