spring-integration - 优雅地关闭 spring 集成流程
问题描述
我有一个 spring 集成应用程序,我想优雅地关闭它。该应用程序在 docker 容器中运行,我想要关闭的流程将大量 XML 文件从一个外部系统传输到另一个系统。
要求是:如果应用程序必须关闭,则当前文件传输应完成,之后不应再触摸其他文件。
到目前为止我所学和所做的: - docker stop 向容器主进程发送一个 SIGTERM,然后在 10 秒后发送一个 SIGKILL(可使用 --time=x 选项配置) - 我实现了一个 ApplicationListener 并将其注册为 @Bean ,因此它将在应用程序上下文中注册。- Flow 使用带有 transactionManager 的轮询器,因此 ApplicationListener 可以确定轮询器是否有打开的事务,如果是,则 Listener 等待一段时间。
我现在的问题是:使用此解决方案,我可以等到当前文件传输完成,但我无法告诉流程停止读取入站文件。如果传输完成并且在 ApplicationListener 等待时另一个文件到达,流将抓取文件并开始另一个传输,这可能会在 SIGKILL 到达时放弃。作为 Lifecycle 的流注入和对 stop() 的调用似乎并没有像我想象的那样工作。
我的问题是,有没有办法告诉 Flow,他应该完成他的工作,但不应该收听任何到达的消息?
到目前为止,这是我的代码:OutboundFlow:
@Bean
public PseudoTransactionManager transactionManager() {
return new PseudoTransactionManager();
}
@Bean
public TransactionSynchronizationFactory transactionSynchronizationFactory() {
final ExpressionEvaluatingTransactionSynchronizationProcessor processor = new ExpressionEvaluatingTransactionSynchronizationProcessor();
processor.setBeanFactory(beanFactory);
return new DefaultTransactionSynchronizationFactory(processor);
}
@Bean
public PollerSpec orderOutboundFlowTempFileInPoller() {
return Pollers
.fixedDelay(pollerDelay)
.maxMessagesPerPoll(100)
.transactional(transactionManager())
.transactionSynchronizationFactory(transactionSynchronizationFactory());
}
@Bean
public IntegrationFlow orderOutboundFlowTempFileIn() {
return IntegrationFlows
.from(Files.inboundAdapter(new File(temporaryPath + '/' + OrderUtils.SUBDIR_TMP_ORDER))
.filterFunction(
f -> OrderUtils.fileInputFilter(f, partnerConfigRepo, "orderOutboundFlowTempFileIn")),
e -> e.poller(orderOutboundFlowTempFileInPoller())) ...
应用:
public static void main(final String[] args) throws Exception {
SpringApplication.run(App.class, args);
}
@Bean
public GracefulShutdown gracefulShutdown() {
return new GracefulShutdown();
}
private static class GracefulShutdown implements ApplicationListener<ContextClosedEvent> {
private static final Logger LOG = LoggerFactory.getLogger(GracefulShutdown.class);
@Autowired
private Lifecycle orderOutboundFlowTempFileIn;
@Override public void onApplicationEvent(ContextClosedEvent event) {
LOG.info("Trying to gracefully shutdown App");
ApplicationContext context = event.getApplicationContext();
PollerSpec outboundFlowTempFileInPoller = context.getBean(PollerSpec.class, "orderOutboundFlowTempFileInPoller");
orderOutboundFlowTempFileIn.stop();
TransactionInterceptor transactionManager = (TransactionInterceptor) (outboundFlowTempFileInPoller.get()
.getAdviceChain()
.iterator().next());
if (transactionManager.getTransactionManager() instanceof AbstractPlatformTransactionManager) {
final TransactionStatus transaction = transactionManager.getTransactionManager().getTransaction(null);
LOG.info("This is the transaction: " + transaction.toString() + ", isActive? " + !transaction.isCompleted());
while (!transaction.isCompleted()) {
try {
LOG.info("Still active, waiting 30 more seconds");
Thread.sleep(30000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
LOG.info("Transaction completed");
}
}
}
解决方案
您实际上只需SourcePollingChannelAdapter
为Files.inboundAdapter()
. 为此,您需要向 lambda 添加.id()
一个e
。SourcePollingChannelAdapter
并在需要停止它时使用该 id 来检索它。
通过这种方式,您可以立即停止接收新文件,而那些正在运行的文件将会正确完成。
没有理由从这里停止整个流程,因为所有下游组件都是被动的。
推荐阅读
- azure - azure 应用服务身份验证模块是否支持 azure aad b2c?
- elasticsearch - elasticsearch Basic 许可证中每个集群的最大节点数是多少?
- c# - 我想创建一个共享数据库 c# windows 窗体应用程序?
- bash - $RANDOM 从函数返回时保持不变
- sorting - 是否可以在threejs中手动对对象进行排序?
- c# - Linq 复杂内连接
- spring-boot - 在 RabbitMQ 中使用 @StreamListener 从两个不同的队列中消费多条消息
- excel-2010 - 如何让 Excel 根据表中的最大值返回一个值?
- r - 在调试已编译的 C/C++ 代码 (GDB) 期间,在全局环境中列出 R 变量名称及其 SEXP
- python - 如何在 Python 函数中正确地对代码进行单元测试