首页 > 解决方案 > 优雅地关闭 spring 集成流程

问题描述

我有一个 spring 集成应用程序,我想优雅地关闭它。该应用程序在 docker 容器中运行,我想要关闭的流程将大量 XML 文件从一个外部系统传输到另一个系统。

要求是:如果应用程序必须关闭,则当前文件传输应完成,之后不应再触摸其他文件。

到目前为止我所学和所做的: - docker stop 向容器主进程发送一个 SIGTERM,然后在 10 秒后发送一个 SIGKILL(可使用 --time=x 选项配置) - 我实现了一个 ApplicationListener 并将其注册为 @Bean ,因此它将在应用程序上下文中注册。- Flow 使用带有 transactionManager 的轮询器,因此 ApplicationListener 可以确定轮询器是否有打开的事务,如果是,则 Listener 等待一段时间。

我现在的问题是:使用此解决方案,我可以等到当前文件传输完成,但我无法告诉流程停止读取入站文件。如果传输完成并且在 ApplicationListener 等待时另一个文件到达,流将抓取文件并开始另一个传输,这可能会在 SIGKILL 到达时放弃。作为 Lifecycle 的流注入和对 stop() 的调用似乎并没有像我想象的那样工作。

我的问题是,有没有办法告诉 Flow,他应该完成他的工作,但不应该收听任何到达的消息?

到目前为止,这是我的代码:OutboundFlow:

  @Bean
  public PseudoTransactionManager transactionManager() {
    return new PseudoTransactionManager();
  }

  @Bean
  public TransactionSynchronizationFactory transactionSynchronizationFactory() {
    final ExpressionEvaluatingTransactionSynchronizationProcessor processor = new ExpressionEvaluatingTransactionSynchronizationProcessor();
    processor.setBeanFactory(beanFactory);
    return new DefaultTransactionSynchronizationFactory(processor);
  }


  @Bean
  public PollerSpec orderOutboundFlowTempFileInPoller() {
    return Pollers
        .fixedDelay(pollerDelay)
        .maxMessagesPerPoll(100)
        .transactional(transactionManager())
        .transactionSynchronizationFactory(transactionSynchronizationFactory());
  }

  @Bean
  public IntegrationFlow orderOutboundFlowTempFileIn() {
    return IntegrationFlows
        .from(Files.inboundAdapter(new File(temporaryPath + '/' + OrderUtils.SUBDIR_TMP_ORDER))
                .filterFunction(
                    f -> OrderUtils.fileInputFilter(f, partnerConfigRepo, "orderOutboundFlowTempFileIn")),
            e -> e.poller(orderOutboundFlowTempFileInPoller())) ...

应用:

  public static void main(final String[] args) throws Exception {
    SpringApplication.run(App.class, args);
  }

  @Bean
  public GracefulShutdown gracefulShutdown() {
    return new GracefulShutdown();
  }

  private static class GracefulShutdown implements ApplicationListener<ContextClosedEvent> {
    private static final Logger LOG = LoggerFactory.getLogger(GracefulShutdown.class);

    @Autowired
    private Lifecycle orderOutboundFlowTempFileIn;

    @Override public void onApplicationEvent(ContextClosedEvent event) {
      LOG.info("Trying to gracefully shutdown App");
      ApplicationContext context = event.getApplicationContext();
      PollerSpec outboundFlowTempFileInPoller = context.getBean(PollerSpec.class, "orderOutboundFlowTempFileInPoller");

      orderOutboundFlowTempFileIn.stop();
      TransactionInterceptor transactionManager = (TransactionInterceptor) (outboundFlowTempFileInPoller.get()
          .getAdviceChain()
          .iterator().next());
      if (transactionManager.getTransactionManager() instanceof AbstractPlatformTransactionManager) {
        final TransactionStatus transaction = transactionManager.getTransactionManager().getTransaction(null);
        LOG.info("This is the transaction: " + transaction.toString() + ", isActive? " + !transaction.isCompleted());
        while (!transaction.isCompleted()) {
          try {
            LOG.info("Still active, waiting 30 more seconds");
            Thread.sleep(30000);
          } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
          }
        }
        LOG.info("Transaction completed");
      }
    }
  }

标签: spring-integration

解决方案


您实际上只需SourcePollingChannelAdapterFiles.inboundAdapter(). 为此,您需要向 lambda 添加.id()一个eSourcePollingChannelAdapter并在需要停止它时使用该 id 来检索它。

通过这种方式,您可以立即停止接收新文件,而那些正在运行的文件将会正确完成。

没有理由从这里停止整个流程,因为所有下游组件都是被动的。


推荐阅读