线程池,我是谁?我在哪儿?

2022-07-18 10:38:10 +08:00
 yanhomlin

大家好,这篇文章跟大家探讨下日常使用线程池的各种姿势,重点介绍怎么在 Spring 环境中正确使用线程池。

线程池使用姿势

首先问大家一个问题,你日常开发中是怎样使用线程池的?

我想大致可以分为以下四种情况:

1.方法级,随用随建,用完关闭

2.类级共享,定义个 static final 修饰的 ThreadPoolExecutor ,该类及子类(看修饰符)所有对象、方法共享

3.业务共享,按业务类型定义多个 ThreadPoolExecutor ,相同业务类型共享同一线程池对象

4.全局共享,服务所有地方共享同一全局线程池

一般来说,优先使用方式 3 ,其次方式 2 ,不要使用方式 1 跟 4 ,原因如下

1.线程池出现的目的就是为了统一管理线程资源,减少频繁创建销毁线程带来的开销,使用池化技术复用线程执行任务,提升系统性能,在高并发、异步化的场景下,方法级使用根本达不到此目的,反而会使性能变低。

2.全局共享一个线程池,任务执行参差不齐,相互影响,高耗时任务会占满线程池资源,导致低耗时任务没机会执行;同时如果任务之间存在父子关系,可能会导致死锁的发生,进而引发 OOM 。

3.按业务类型进行线程池隔离,各任务执行互不影响,粒度也比类级共享大点,不会创建大量线程池,降低系统调度压力,像 Hystrix 线程池隔离就可以理解成这种模式。

综上,建议大家都采用方式 3 ,按业务功能分类定义线程池。

Spring 项目中使用 ThreadPoolExecutor

Spring 作为一个 Bean 容器,我们通常会将业务中用到的 ThreadPoolExecutor 注册到 Spring 容器中,同时 Spring 在容器刷新的时候会注入相应的 ThreadPoolExecutor 对象 到我们的业务 Bean 中,然后就可以直接使用了,比如定义如下( ThreadPoolBuilder 是封装的一个建造者模式实现):

@Configuration
public class ThreadPoolConfiguration {
    
    @Bean
    public ThreadPoolExecutor jobExecutor() {
        return ThreadPoolBuilder.newBuilder()
                .corePoolSize(10)
                .maximumPoolSize(15)
                .keepAliveTime(15000)
                .timeUnit(TimeUnit.MILLISECONDS)
                .workQueue(LINKED_BLOCKING_QUEUE.getName(), 3000)
                .build();
    }

    @Bean
    public ThreadPoolExecutor remotingExecutor() {
        return ThreadPoolBuilder.newBuilder()
                .corePoolSize(10)
                .maximumPoolSize(15)
                .keepAliveTime(15000)
                .timeUnit(TimeUnit.MILLISECONDS)
                .workQueue(SYNCHRONOUS_QUEUE.getName(), null)
                .build();
    }

    @Bean
    public ThreadPoolExecutor consumeExecutor() {
        return ThreadPoolBuilder.newBuilder()
                .corePoolSize(10)
                .maximumPoolSize(15)
                .keepAliveTime(15000)
                .timeUnit(TimeUnit.MILLISECONDS)
                .workQueue(LINKED_BLOCKING_QUEUE.getName(), 5000)
                .build();
    }
}

以上按使用场景定义了三个线程池实例,一个用来执行耗时的定时任务、一个用来执行远程 RPC 调用、一个用来执行 Mq 消费。

这样使用 ThreadPoolExecutor 有个问题,Spring 容器关闭的时候可能任务队列里的任务还没处理完,有丢失任务的风险。

我们知道 Spring 中的 Bean 是有生命周期的,如果 Bean 实现了 Spring 相应的生命周期接口( InitializingBean 、DisposableBean 接口),在 Bean 初始化、容器关闭的时候会调用相应的方法来做相应处理。

所以建议最好不要直接使用 ThreadPoolExecutor 在 Spring 环境中,可以使用 Spring 提供的 ThreadPoolTaskExecutor ,或者 DynamicTp 框架提供的 DtpExecutor 线程池实现

一些 Spring 知识

这里分享一个源码阅读技巧,就是开源项目和 Spring 整合时,很多同学不知从何入手阅读源码。

我们知道 Spring 提供了很多的扩展点,第三方框架整合 Spring 其实大多也都是基于这些扩展接口来做的,所以我们可以从这些扩展接口入手,断点调试,一步步深入框架内核。

这些扩展包括但不限于以下接口:

BeanFactoryPostProcessor:在 Bean 实例化之前对 BeanDefinition 进行修改

BeanPostProcessor:在 Bean 初始化前后对 Bean 进行一些修改包装增强,比如返回代理对象

Aware:一个标记接口,实现该接口及子接口的类会收到 Spring 的通知回调,赋予某种 Spring 框架的能力,比如 ApplicationContextAware 、EnvironmentAware 等

ApplicationContextInitializer:在上下文准备阶段,容器刷新之前做一些初始化工作,比如我们常用的配置中心 client 基本都是继承该初始化器,在容器刷新前将配置从远程拉到本地,然后封装成 PropertySource 放到 Environment 中供使用

ApplicationListener:Spring 事件机制,监听特定的应用事件( ApplicationEvent ),观察者模式的一种实现

FactoryBean:用来自定义 Bean 的创建逻辑( Mybatis 、Feign 等等)

ImportBeanDefinitionRegistrar:定义 @EnableXXX 注解,在注解上 Import 了一个 ImportBeanDefinitionRegistrar ,实现注册 BeanDefinition 到容器中

ApplicationRunner/CommandLineRunner:容器启动后回调,执行一些初始化工作

上述列出了几个比较常用的接口,但是 Spring 扩展远不于此,还有很多扩展接口大家可以自己去了解。

DynamicTp 生成线程池对象

DynamicTp 框架内部定义了 DtpExecutor 线程池类,其继承关系如下:

EagerDtpExecutor:参考 Tomcat 线程池设计,调整了下线程池的执行流程,优先创建线程执行任务而不是放入队列中,主要用于 IO 密集型场景,继承 DtpExecutor

DtpExecutor:重写了 ThreadPoolExecutor 的 execute 方法、beforeExecute 方法、afterExecute 方法,主要做任务包装、执行超时、等待超时记录等,继承 DtpLifecycleSupport

DtpLifecycleSupport:实现了 Spring 中的 InitializingBean, DisposableBean 接口,在 Bean 初始化、Spring 容器销毁时执行相应的逻辑,destroy 方法逻辑如下:

    @Override
    public void destroy() {
        internalShutdown();
    }

    public void internalShutdown() {
        if (log.isInfoEnabled()) {
            log.info("Shutting down ExecutorService, poolName: {}", threadPoolName);
        }
  
        if (this.waitForTasksToCompleteOnShutdown) {
            // 如果需要等待任务执行完毕,则调用 shutdown()会执行先前已提交的任务,拒绝新任务提交,线程池状态变成 SHUTDOWN
            this.shutdown();
        } else {
            // 如果不需要等待任务执行完毕,则直接调用 shutdownNow()方法,尝试中断正在执行的任务,返回所有未执行的任务,线程池状态变成 STOP , 然后调用 Future 的 cancel 方法取消
            for (Runnable remainingTask : this.shutdownNow()) {
                cancelRemainingTask(remainingTask);
            }
        }
        awaitTerminationIfNecessary();
    }

    protected void cancelRemainingTask(Runnable task) {
        if (task instanceof Future) {
            ((Future<?>) task).cancel(true);
        }
    }

    private void awaitTerminationIfNecessary() {
        if (this.awaitTerminationSeconds <= 0) {
            return;
        }
        try {
            // 配合 shutdown 使用,阻塞当前线程,等待已提交的任务执行完毕或者超时
            if (!awaitTermination(this.awaitTerminationSeconds, TimeUnit.SECONDS) && log.isWarnEnabled()) {
                log.warn("Timed out while waiting for executor {} to terminate", threadPoolName);
            }
        } catch (InterruptedException ex) {
            if (log.isWarnEnabled()) {
                log.warn("Interrupted while waiting for executor {} to terminate", threadPoolName);
            }
            Thread.currentThread().interrupt();
        }
    }

DynamicTp 框架在整合 Spring 的时候,也是用到了上述说的扩展接口。

扩展 1

    @Target({ElementType.TYPE, ElementType.ANNOTATION_TYPE})
    @Retention(RetentionPolicy.RUNTIME)
    @Documented
    @Import(DtpBeanDefinitionRegistrar.class)
    public @interface EnableDynamicTp {
    }

使用过 DynamicTp 的小伙伴应该知道需要在启动类加 @EnableDynamicTp 注解,该注解其实就用到了 ImportBeanDefinitionRegistrar 扩展,主要代码如下:

    @Override
    public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {

        DtpProperties dtpProperties = new DtpProperties();
        // 将环境变量中的线程池相关配置绑定到 DtpProperties 对象上
        PropertiesBinder.bindDtpProperties(environment, dtpProperties);
        val executors = dtpProperties.getExecutors();
        if (CollUtil.isEmpty(executors)) {
            log.warn("DynamicTp registrar, no executors are configured.");
            return;
        }

        executors.forEach(x -> {
            // 判断线程池类型( common or eager )
            Class<?> executorTypeClass = ExecutorType.getClass(x.getExecutorType());
            String beanName = x.getThreadPoolName();
            // 线程池对象属性
            Map<String, Object> properties = buildProperties(x);
            // 构造器参数
            Object[] args = buildArgs(executorTypeClass, x);
            BeanUtil.registerIfAbsent(registry, beanName, executorTypeClass, properties, args);
        });
    }

代码解读:

1.我们知道 ImportBeanDefinitionRegistrar 的实现是在 Spring 容器刷新的时候执行的,在此之前在上下文准备阶段已经从配置中心拉取到线程池配置放到环境变量里了,所以第一步我们将环境变量里的线程池相关配置绑定到 DtpProperties 对象上。

2.然后构造 BeanDefinitionBuilder 对象,设置构造函数参数、设置属性值,注册到 BeanDefinition 到 Spring 容器中

    public static void doRegister(BeanDefinitionRegistry registry,
                                  String beanName,
                                  Class<?> clazz,
                                  Map<String, Object> properties,
                                  Object... constructorArgs) {
        BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(clazz);
        // 设置构造器参数,老八股文了
        for (Object constructorArg : constructorArgs) {
            builder.addConstructorArgValue(constructorArg);
        }
        
        // 设置属性及值的 KV 对,后续在 Bean populateBean 的时候会通过反射 set 方法赋值
        if (CollUtil.isNotEmpty(properties)) {
            properties.forEach(builder::addPropertyValue);
        }

        registry.registerBeanDefinition(beanName, builder.getBeanDefinition());
    }

3.Spring 容器刷新时会根据注册的 BeanDefinition 创建配置的线程池对象,初始化赋值,并注入到引用的 Bean 中。这样就不用在手动用 @Bean 声明线程池对象了,只需要在配置中心配置即可

扩展 2

DtpPostProcessor 继承 BeanPostProcessor ,在 Bean 初始化前后对 ThreadPoolExecutor 及其子类进行一些处理,主要用来获取线程池对象注册到 DynamicTp 框架内部定义的容器中(就个 Map )

public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {

        if (!(bean instanceof ThreadPoolExecutor)) {
            return bean;
        }

        if (bean instanceof DtpExecutor) {
            DtpExecutor dtpExecutor = (DtpExecutor) bean;
            if (bean instanceof EagerDtpExecutor) {
                ((TaskQueue) dtpExecutor.getQueue()).setExecutor((EagerDtpExecutor) dtpExecutor);
            }
            registerDtp(dtpExecutor);
            return dtpExecutor;
        }

        ApplicationContext applicationContext = ApplicationContextHolder.getInstance();
        DynamicTp dynamicTp;
        try {
            dynamicTp = applicationContext.findAnnotationOnBean(beanName, DynamicTp.class);
            if (dynamicTp == null) {
                return bean;
            }
        } catch (NoSuchBeanDefinitionException e) {
            log.error("There is no bean with the given name {}", beanName, e);
            return bean;
        }

        String poolName = StringUtils.isNotBlank(dynamicTp.value()) ? dynamicTp.value() : beanName;
        registerCommon(poolName, (ThreadPoolExecutor) bean);
        return bean;
    }

扩展 3

ApplicationListener 主要用来解耦逻辑,发布监听事件,core 模块跟 adapter 模块通信主要就用该扩展,以及框架会监听 Spring 容器启动的各阶段事件,做相应的逻辑处理

public abstract class AbstractDtpHandleListener implements GenericApplicationListener {

    @Override
    public boolean supportsEventType(ResolvableType resolvableType) {
        Class<?> type = resolvableType.getRawClass();
        if (type != null) {
            return RefreshEvent.class.isAssignableFrom(type) ||
                    CollectEvent.class.isAssignableFrom(type) ||
                    AlarmCheckEvent.class.isAssignableFrom(type);
        }
        return false;
    }

    @Override
    public void onApplicationEvent(@NonNull ApplicationEvent event) {
        try {
            if (event instanceof RefreshEvent) {
                doRefresh(((RefreshEvent) event).getDtpProperties());
            } else if (event instanceof CollectEvent) {
                doCollect(((CollectEvent) event).getDtpProperties());
            } else if (event instanceof AlarmCheckEvent) {
                doAlarmCheck(((AlarmCheckEvent) event).getDtpProperties());
            }
        } catch (Exception e) {
            log.error("DynamicTp adapter, event handle failed.", e);
        }
    }
}

扩展 4

ApplicationRunner ,等 Spring 容器启动后,会调用该钩子函数,执行一些初始化操作,DtpMonitor 、DtpRegistry 等都用到了该扩展

所以 DynamicTp 的正确使用姿势,线程池只需在配置中心声明,然后服务启动时框架会基于 Spring 的这些扩展自动创建线程池对象注入到所需的 Bean 中,代码中不需要显示声明

再次介绍下 DynamicTp 框架

DynamicTp 是一个基于配置中心实现的轻量级动态线程池管理工具,主要功能可以总结为 动态调参、通知报警、运行监控、三方包线程池管理等几大类。

经过几个版本迭代,目前最新版本 v1.0.7 具有以下特性

特性

项目地址

目前累计 1.5k star ,感谢你的 star ,欢迎 pr ,业务之余一起给开源贡献一份力量

官网https://dynamictp.cn

gitee 地址https://gitee.com/dromara/dynamic-tp

github 地址https://github.com/dromara/dynamic-tp

1874 次点击
所在节点    程序员
3 条回复
siweipancc
2022-07-18 14:54:05 +08:00
b+1 bye
kisick
2022-07-18 21:23:21 +08:00
请问你是参考了美团的文章还是美团应用了你的框架?
bvbvb
2022-07-19 07:22:35 +08:00
微服务,协程

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/866923

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX