首页 > 解决方案 > 当具有固定线程池的执行器服务与 Kafka 侦听器一起使用时,线程将从队列中删除

问题描述

当我尝试将 executorService(50 个固定线程池)与 Kafka 一起使用时,一些线程正在从队列中删除并且没有得到处理。

例如,对于以下代码:日志中打印的行数:“内部 Kafka 侦听器”:3000“处理消息”:2100“发生异常”:0

@Named
public class KafkaListener {

    ExecutorService executorService = Executors.newFixedThreadPool(50);

    @Inject
    KafkaMessageProcessor kafkaMessageProcessor;

    @KafkaListener(
            topics = "topicName",
            containerFactory = "filterKafkaListenerContainerFactory")
    public void listen(String message) {
        try {
            System.out.println("Inside Kafka Listener " + message);
            executorService.submit(() -> {
                System.out.println("Processing message");
                kafkaMessageProcessor.process(message);
            });
        } catch (Exception e) {
            System.out.println("Exception occurred :" + e);
        }
    }
}

我希望日志中的行数等于 newFixedThreadPool 使用 Blocking Queue with size Integer.MAX_VALUE

标签: javamultithreadingapache-kafkaexecutorserviceexecutor

解决方案


推荐阅读