首页 > 解决方案 > Spring Integration:仅在处理完上次轮询的所有结果后才启动 JPA 轮询

问题描述

我想使用 Spring Integration Java DSL 实现以下流程:

  1. 每 2 小时轮询一次数据库中的表,该表返回需要处理的文档的 id
  2. 对于每个 id,通过一个 HTTP 网关处理一个文档
  3. 将响应存储在数据库中

我有一个可以执行这些步骤的有效 Java 代码。我正在努力解决的另一个要求是,在上次轮询的所有文档都已处理并存储在数据库中之前,不应进行下一轮文档的轮询。

我可以将 Spring Integration 中的任何模式用于此附加要求吗?

这是一个简化的代码 - 它会变得更加复杂,我会将文档的处理(HTTP 出站和持久化)拆分为单独的类/流:

return IntegrationFlows.from(Jpa.inboundAdapter(this.targetEntityManagerFactory)
                .entityClass(ProcessingMetadata.class)
                .jpaQuery("select max(p.modifiedDate) from ProcessingMetadata p " +
                        "where p.status = com.test.ProcessingStatus.PROCESSED")
                .maxResults(1)
                .expectSingleResult(true),
        e -> e.poller(Pollers.fixedDelay(Duration.ofSeconds(10))))
        .handle(Jpa.retrievingGateway(this.sourceEntityManagerFactory)
                .entityClass(DocumentHeader.class)
                .jpaQuery("from DocumentHeader d where d.modified > :modified")
                .parameterExpression("modified", "payload"))
        .handle(Http.outboundGateway(uri)
                .httpMethod(HttpMethod.POST)
                .expectedResponseType(String.class))
        .handle(Jpa.outboundAdapter(this.targetEntityManagerFactory)
                        .entityClass(ProcessingMetadata.class)
                        .persistMode(PersistMode.PERSIST),
                e -> e.transactional(true))
        .get();

更新

按照 Artem 的建议,我正在尝试使用SimpleActiveIdleMessageSourceAdvice来实现它

class WaitUntilCompleted extends SimpleActiveIdleMessageSourceAdvice {

    public WaitUntilCompleted(DynamicPeriodicTrigger trigger) {
        super(trigger);
    }

    @Override
    public boolean beforeReceive(MessageSource<?> source) {
        return false;
    }
}

如果我理解正确,上面的代码将停止轮询。现在我不知道如何将此 Advice 附加到Jpa.inboundAdapter ...它似乎没有合适的方法(既没有 Advice 也没有 Spec Handler)。我在这里错过了一些明显的东西吗?我已经尝试将 Advice 附加到Jpa.retrievingGateway但它根本不会改变流程。

更新2

检查这个问题以获得完整的解决方案:Spring Integration:如何对建议进行单元测试

标签: javaspringspring-integrationspring-integration-dsl

解决方案


我今天回答了类似的问题:在 Spring Integration 中完成下游流程后,如何一次从队列 1 消息中进行轮询

你也可能在数据库级别有一个技巧,不要让在其他人被锁定时看到表中的新记录。或者您可以UPDATE在流程结束时获得一些,而在SELECT它们分别更新之前您不会看到适当的记录。

但无论如何,我为这个问题建议的任何方法也应该在这里应用。

此外,您确实可以考虑依赖 ,SimpleActiveIdleMessageSourceAdvice因为您的解决方案已经基于MessageSource实现。

更新

对于您的用例,最好扩展它SimpleActiveIdleMessageSourceAdvice并覆盖它beforeReceive()以检查您是否能够读取更多数据的某些状态。idlePollPeriod和可能是相同的activePollPeriod值:在两者之间更改它看起来没有意义,因为您在读取下一组数据后将进入空闲状态。

为了检查状态,它实际上可能是一个简单的AtomicBooleanbean,您应该在处理当前文档集后对其进行更改。这可能是在聚合器或您可以在解决方案中使用的任何其他内容之后。

更新 2

要为您使用 a WaitUntilCompleted,您Jpa.inboundAdapter应该具有如下配置:

IntegrationFlows.from(Jpa.inboundAdapter(this.targetEntityManagerFactory)
            .entityClass(ProcessingMetadata.class)
            .jpaQuery("select max(p.modifiedDate) from ProcessingMetadata p " +
                    "where p.status = com.test.ProcessingStatus.PROCESSED")
            .maxResults(1)
            .expectSingleResult(true),
    e -> e.poller(Pollers.fixedDelay(Duration.ofSeconds(10)).advice(waitUntilCompleted())))

注意.advice(waitUntilCompleted())which 是 pller 配置的一部分并指向您的建议 bean。


推荐阅读