java - 当具有固定线程池的执行器服务与 Kafka 侦听器一起使用时,线程将从队列中删除
问题描述
当我尝试将 executorService(50 个固定线程池)与 Kafka 一起使用时,一些线程正在从队列中删除并且没有得到处理。
例如,对于以下代码:日志中打印的行数:“内部 Kafka 侦听器”:3000“处理消息”:2100“发生异常”:0
@Named
public class KafkaListener {
ExecutorService executorService = Executors.newFixedThreadPool(50);
@Inject
KafkaMessageProcessor kafkaMessageProcessor;
@KafkaListener(
topics = "topicName",
containerFactory = "filterKafkaListenerContainerFactory")
public void listen(String message) {
try {
System.out.println("Inside Kafka Listener " + message);
executorService.submit(() -> {
System.out.println("Processing message");
kafkaMessageProcessor.process(message);
});
} catch (Exception e) {
System.out.println("Exception occurred :" + e);
}
}
}
我希望日志中的行数等于 newFixedThreadPool 使用 Blocking Queue with size Integer.MAX_VALUE
。
解决方案
推荐阅读
- python - 使用 Python 从 PE 文件中提取软件签名证书
- reactjs - html标签“/href”和navlink或Link之间的区别
- postgresql - postgres 函数在调用 varchar 的 insted 时采用参数 bytea
- git - 给定两个任意提交,如何检测 git merge 是否会导致空提交?
- gdb - 在 gdb 中调试版本(无符号),但使用源构建文件夹(.o 文件)?
- python - self 之后什么时候需要参数
- kubernetes - 停止 k8s initContainers 卷覆盖容器文件夹
- github - 标记和发布资产 url 的 Github 列表
- java - 如何在图像上制作流行艺术照片滤镜
- php - 处理 HTML 模板以通过 AJAX 动态更新页面