首页 > 解决方案 > 用自己的 Executor 替换默认的 SimpleAsyncTaskExecutor 有什么缺点和风险

问题描述

个人知识:我从javacodegeeks 中读到:“...... SimpleAsyncTaskExecutor对于玩具项目来说还可以,但对于任何比这更大的项目,它都有点冒险,因为它不限制并发线程并且不重用线程。所以为了安全起见,我们还将添加一个任务执行器 bean..." 以及来自baeldung的一个非常简单的示例,如何添加我们自己的任务执行器。但是我可以找到任何指导来解释后果和一些值得应用的案例。

个人愿望:我正在努力为我们的微服务日志提供一个企业架构,以便在 Kafka 主题上发布。主要针对基于日志的我的情况,“不限制并发线程并且不重用它造成的风险”这句话似乎是合理的。

我在本地桌面上成功运行了波纹管代码,但我想知道我是否正确地提供了自定义任务执行器。

我的问题:考虑到我已经在使用 kafkatempla(即同步、单例和线程安全,至少据了解,至少用于生成/发送消息),此配置是否真的朝着正确的方向重用线程并避免意外传播使用 SimpleAsyncTaskExecutor 时创建线程?

生产者配置

@EnableAsync
@Configuration
public class KafkaProducerConfig {

    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducerConfig.class);

    @Value("${kafka.brokers}")
    private String servers;

    @Bean
    public Executor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(2);
        executor.setMaxPoolSize(2);
        executor.setQueueCapacity(500);
        executor.setThreadNamePrefix("KafkaMsgExecutor-");
        executor.initialize();
        return executor;
    }

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        return props;
    }

}

制片人

@Service
public class Producer {

    private static final Logger LOGGER = LoggerFactory.getLogger(Producer.class);

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Async
    public void send(String topic, String message) {
        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);
        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {

            @Override
            public void onSuccess(final SendResult<String, String> message) {
                LOGGER.info("sent message= " + message + " with offset= " + message.getRecordMetadata().offset());
            }

            @Override
            public void onFailure(final Throwable throwable) {
                LOGGER.error("unable to send message= " + message, throwable);
            }
        });
    }
}

用于演示目的:

@SpringBootApplication
public class KafkaDemoApplication  implements CommandLineRunner {

    public static void main(String[] args) {
        SpringApplication.run(KafkaDemoApplication.class, args);

    }

    @Autowired
    private Producer p;

    @Override
    public void run(String... strings) throws Exception {
        p.send("test", " qualquer messagem demonstrativa");
    }

}

标签: javaspringmultithreadingspring-bootapache-kafka

解决方案


这是默认实现SimpleAsyncTaskExecutor

protected void doExecute(Runnable task) {
    Thread thread = (this.threadFactory != null ? this.threadFactory.newThread(task) : createThread(task));
    thread.start();
}

为每个任务创建新线程,在 Java 中创建线程并不便宜:(参考

线程对象使用大量内存,并且在大型应用程序中,分配和释放许多线程对象会产生大量内存管理开销。

=> 用这个任务执行器重复执行任务会对应用程序性能产生负面影响(而且这个执行器默认不限制并发任务的数量)

这就是为什么建议您使用线程池实现的原因,线程创建开销仍然存在,但由于线程被重用而不是 create-fire-forget 而显着减少。

配置时ThreadPoolTaskExecutor,应根据您的应用程序负载正确定义两个值得注意的参数:

  1. private int maxPoolSize = Integer.MAX_VALUE;

    这是池中的最大线程数。

  2. private int queueCapacity = Integer.MAX_VALUE;

    这是排队的最大任务数。当队列已满时,默认值可能会导致 OutOfMemory 异常。

使用默认值 ( Integer.MAX_VALUE) 可能会导致服务器资源不足/崩溃。

您可以通过增加最大池大小的数量来提高吞吐量setMaxPoolSize(),以减少负载增加时的预热,将核心池大小设置为更高的值setCorePoolSize()(负载增加时将启动不同数量的线程maxPoolSize - corePoolSize


推荐阅读