java - 用自己的 Executor 替换默认的 SimpleAsyncTaskExecutor 有什么缺点和风险
问题描述
个人知识:我从javacodegeeks 中读到:“...... SimpleAsyncTaskExecutor对于玩具项目来说还可以,但对于任何比这更大的项目,它都有点冒险,因为它不限制并发线程并且不重用线程。所以为了安全起见,我们还将添加一个任务执行器 bean..." 以及来自baeldung的一个非常简单的示例,如何添加我们自己的任务执行器。但是我可以找到任何指导来解释后果和一些值得应用的案例。
个人愿望:我正在努力为我们的微服务日志提供一个企业架构,以便在 Kafka 主题上发布。主要针对基于日志的我的情况,“不限制并发线程并且不重用它造成的风险”这句话似乎是合理的。
我在本地桌面上成功运行了波纹管代码,但我想知道我是否正确地提供了自定义任务执行器。
我的问题:考虑到我已经在使用 kafkatempla(即同步、单例和线程安全,至少据了解,至少用于生成/发送消息),此配置是否真的朝着正确的方向重用线程并避免意外传播使用 SimpleAsyncTaskExecutor 时创建线程?
生产者配置
@EnableAsync
@Configuration
public class KafkaProducerConfig {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducerConfig.class);
@Value("${kafka.brokers}")
private String servers;
@Bean
public Executor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(2);
executor.setMaxPoolSize(2);
executor.setQueueCapacity(500);
executor.setThreadNamePrefix("KafkaMsgExecutor-");
executor.initialize();
return executor;
}
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
return props;
}
}
制片人
@Service
public class Producer {
private static final Logger LOGGER = LoggerFactory.getLogger(Producer.class);
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Async
public void send(String topic, String message) {
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onSuccess(final SendResult<String, String> message) {
LOGGER.info("sent message= " + message + " with offset= " + message.getRecordMetadata().offset());
}
@Override
public void onFailure(final Throwable throwable) {
LOGGER.error("unable to send message= " + message, throwable);
}
});
}
}
用于演示目的:
@SpringBootApplication
public class KafkaDemoApplication implements CommandLineRunner {
public static void main(String[] args) {
SpringApplication.run(KafkaDemoApplication.class, args);
}
@Autowired
private Producer p;
@Override
public void run(String... strings) throws Exception {
p.send("test", " qualquer messagem demonstrativa");
}
}
解决方案
这是默认实现SimpleAsyncTaskExecutor
protected void doExecute(Runnable task) {
Thread thread = (this.threadFactory != null ? this.threadFactory.newThread(task) : createThread(task));
thread.start();
}
为每个任务创建新线程,在 Java 中创建线程并不便宜:(参考)
线程对象使用大量内存,并且在大型应用程序中,分配和释放许多线程对象会产生大量内存管理开销。
=> 用这个任务执行器重复执行任务会对应用程序性能产生负面影响(而且这个执行器默认不限制并发任务的数量)
这就是为什么建议您使用线程池实现的原因,线程创建开销仍然存在,但由于线程被重用而不是 create-fire-forget 而显着减少。
配置时ThreadPoolTaskExecutor
,应根据您的应用程序负载正确定义两个值得注意的参数:
private int maxPoolSize = Integer.MAX_VALUE
;这是池中的最大线程数。
private int queueCapacity = Integer.MAX_VALUE;
这是排队的最大任务数。当队列已满时,默认值可能会导致 OutOfMemory 异常。
使用默认值 ( Integer.MAX_VALUE
) 可能会导致服务器资源不足/崩溃。
您可以通过增加最大池大小的数量来提高吞吐量setMaxPoolSize()
,以减少负载增加时的预热,将核心池大小设置为更高的值setCorePoolSize()
(负载增加时将启动不同数量的线程maxPoolSize - corePoolSize
)
推荐阅读
- amazon-web-services - 如何在 AWS Serverless Application Repository 中发布嵌套堆栈
- reactjs - 反应:默认上下文值为空白
- nginx - mikrotik 在 dst-nat 中更改 src 标头 ip
- css - 检查输入后文本的css选择器
- php - 如果求和时任一侧的单个数组中的值相等,则查找索引
- java - 将高级、简单搜索、列表视图添加到 WCMS 后台资源管理器树 Sap Hybris
- django - 更新自定义用户 DRF 最佳实践的 ManyToMany
- loopbackjs - LB4:授权提供者上下文有空主体
- assembly - 使用 16 位汇编器 NASM 的 3 的倍数
- sql - 检查是否在 SQL 的多个列之一中找到字符串