spring-integration - 如何使用 Spring Integration 设置 ThreadPool 来处理文件消息源?
问题描述
有人可以帮我用线程池重写这个流程吗?以下代码有效,但使用固定延迟来服务传入文件:
@Bean
public IntegrationFlow sampleFlow() {
return IntegrationFlows
.from(fileReadingMessageSource(), c -> c.poller(Pollers.fixedDelay(500)))
.channel(new DirectChannel())
.transform(fileMessageToJobRequest())
.handle(springBatchJobLauncher())
.handle(jobExecution -> {
logger.info("jobExecution payload: {}", jobExecution.getPayload());
})
.get();
}
需要线程,因为文件的速度很快。
解决方案
谢谢@Artem。我根据 Artem 的回答找到了解决方案。诀窍是在下面的代码中使用 TaskExecutor。此外,应将 Pollers.maxMessagesPerPoll(nbfiles) 设置为一次处理多个消息(=文件)。
@Bean
public IntegrationFlow sampleFlow() throws InterruptedException {
return IntegrationFlows
.from(fileReadingMessageSource(), c -> c.poller(Pollers.fixedDelay(5000).maxMessagesPerPoll(5)))
.channel(MessageChannels.executor(threadPoolTaskExecutor()))
.transform(fileMessageToJobRequest())
.handle(springBatchJobLauncher())
.handle(jobExecution -> {
logger.debug("jobExecution payload: {}", jobExecution.getPayload());
})
.get();
}
@Bean
public TaskExecutor threadPoolTaskExecutor() {
int poolSize = 20;
logger.debug("...... createing ThreadPool of size {}.", poolSize);
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setThreadNamePrefix("Dama_Thread_");
executor.setMaxPoolSize(5);
executor.setCorePoolSize(5);
executor.setQueueCapacity(22);
return executor;
}
推荐阅读
- python - 如何使用 BeautifulSoup 和 Selenium 实现 if 语句
- c - 需要了解C中的一些指针
- dgraph - Dgraph:在您的 Golang 结构字段中始终使用 omitempty 是最佳实践吗?
- oop - 对应用设计模式感到困惑
- sql - 如何在oracle sql查询中提取括号之间有几行的字符串
- c# - 文档尚未打开 使用水晶报表
- python - 使用python绘制带有文本的直方图
- reactjs - 任务:app:buildDebugStaticWebviewAssets FAILED
- c++ - 从实感相机中提取深度帧?
- regex - 没有匹配项时的 Pandas 正则表达式替换