首页 > 解决方案 > 如何使用 Spring Integration 设置 ThreadPool 来处理文件消息源?

问题描述

有人可以帮我用线程池重写这个流程吗?以下代码有效,但使用固定延迟来服务传入文件:

@Bean
public IntegrationFlow sampleFlow() {
    return IntegrationFlows
            .from(fileReadingMessageSource(), c -> c.poller(Pollers.fixedDelay(500)))
            .channel(new DirectChannel())
            .transform(fileMessageToJobRequest())
            .handle(springBatchJobLauncher())
            .handle(jobExecution -> {
                logger.info("jobExecution payload: {}", jobExecution.getPayload());
            })
            .get();
}

需要线程,因为文件的速度很快。

标签: spring-integrationspring-integration-dslspring-dsl

解决方案


谢谢@Artem。我根据 Artem 的回答找到了解决方案。诀窍是在下面的代码中使用 TaskExecutor。此外,应将 Pollers.maxMessagesPerPoll(nbfiles) 设置为一次处理多个消息(=文件)。

  @Bean
  public IntegrationFlow sampleFlow() throws InterruptedException {
    return IntegrationFlows
          .from(fileReadingMessageSource(), c -> c.poller(Pollers.fixedDelay(5000).maxMessagesPerPoll(5)))
          .channel(MessageChannels.executor(threadPoolTaskExecutor()))
          .transform(fileMessageToJobRequest())
          .handle(springBatchJobLauncher())
          .handle(jobExecution -> {
            logger.debug("jobExecution payload: {}", jobExecution.getPayload());
          })
          .get();
  }

  @Bean
  public TaskExecutor threadPoolTaskExecutor() {
    int poolSize = 20;
    logger.debug("...... createing ThreadPool of size {}.", poolSize);
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setThreadNamePrefix("Dama_Thread_");
    executor.setMaxPoolSize(5);
    executor.setCorePoolSize(5);
    executor.setQueueCapacity(22);
    return executor;
  }

推荐阅读