java - Spring Integration:仅在处理完上次轮询的所有结果后才启动 JPA 轮询
问题描述
我想使用 Spring Integration Java DSL 实现以下流程:
- 每 2 小时轮询一次数据库中的表,该表返回需要处理的文档的 id
- 对于每个 id,通过一个 HTTP 网关处理一个文档
- 将响应存储在数据库中
我有一个可以执行这些步骤的有效 Java 代码。我正在努力解决的另一个要求是,在上次轮询的所有文档都已处理并存储在数据库中之前,不应进行下一轮文档的轮询。
我可以将 Spring Integration 中的任何模式用于此附加要求吗?
这是一个简化的代码 - 它会变得更加复杂,我会将文档的处理(HTTP 出站和持久化)拆分为单独的类/流:
return IntegrationFlows.from(Jpa.inboundAdapter(this.targetEntityManagerFactory)
.entityClass(ProcessingMetadata.class)
.jpaQuery("select max(p.modifiedDate) from ProcessingMetadata p " +
"where p.status = com.test.ProcessingStatus.PROCESSED")
.maxResults(1)
.expectSingleResult(true),
e -> e.poller(Pollers.fixedDelay(Duration.ofSeconds(10))))
.handle(Jpa.retrievingGateway(this.sourceEntityManagerFactory)
.entityClass(DocumentHeader.class)
.jpaQuery("from DocumentHeader d where d.modified > :modified")
.parameterExpression("modified", "payload"))
.handle(Http.outboundGateway(uri)
.httpMethod(HttpMethod.POST)
.expectedResponseType(String.class))
.handle(Jpa.outboundAdapter(this.targetEntityManagerFactory)
.entityClass(ProcessingMetadata.class)
.persistMode(PersistMode.PERSIST),
e -> e.transactional(true))
.get();
更新
按照 Artem 的建议,我正在尝试使用SimpleActiveIdleMessageSourceAdvice来实现它
class WaitUntilCompleted extends SimpleActiveIdleMessageSourceAdvice {
public WaitUntilCompleted(DynamicPeriodicTrigger trigger) {
super(trigger);
}
@Override
public boolean beforeReceive(MessageSource<?> source) {
return false;
}
}
如果我理解正确,上面的代码将停止轮询。现在我不知道如何将此 Advice 附加到Jpa.inboundAdapter ...它似乎没有合适的方法(既没有 Advice 也没有 Spec Handler)。我在这里错过了一些明显的东西吗?我已经尝试将 Advice 附加到Jpa.retrievingGateway但它根本不会改变流程。
更新2
检查这个问题以获得完整的解决方案:Spring Integration:如何对建议进行单元测试
解决方案
我今天回答了类似的问题:在 Spring Integration 中完成下游流程后,如何一次从队列 1 消息中进行轮询。
你也可能在数据库级别有一个技巧,不要让在其他人被锁定时看到表中的新记录。或者您可以UPDATE
在流程结束时获得一些,而在SELECT
它们分别更新之前您不会看到适当的记录。
但无论如何,我为这个问题建议的任何方法也应该在这里应用。
此外,您确实可以考虑依赖 ,SimpleActiveIdleMessageSourceAdvice
因为您的解决方案已经基于MessageSource
实现。
更新
对于您的用例,最好扩展它SimpleActiveIdleMessageSourceAdvice
并覆盖它beforeReceive()
以检查您是否能够读取更多数据的某些状态。idlePollPeriod
和可能是相同的activePollPeriod
值:在两者之间更改它看起来没有意义,因为您在读取下一组数据后将进入空闲状态。
为了检查状态,它实际上可能是一个简单的AtomicBoolean
bean,您应该在处理当前文档集后对其进行更改。这可能是在聚合器或您可以在解决方案中使用的任何其他内容之后。
更新 2
要为您使用 a WaitUntilCompleted
,您Jpa.inboundAdapter
应该具有如下配置:
IntegrationFlows.from(Jpa.inboundAdapter(this.targetEntityManagerFactory)
.entityClass(ProcessingMetadata.class)
.jpaQuery("select max(p.modifiedDate) from ProcessingMetadata p " +
"where p.status = com.test.ProcessingStatus.PROCESSED")
.maxResults(1)
.expectSingleResult(true),
e -> e.poller(Pollers.fixedDelay(Duration.ofSeconds(10)).advice(waitUntilCompleted())))
注意.advice(waitUntilCompleted())
which 是 pller 配置的一部分并指向您的建议 bean。
推荐阅读
- javascript - 根据 url 或 ID 加载 div
- bash - Go中带参数的exec命令?
- php - 在 php 代码中运行嵌入 360 度照片的框架
- vb.net - vb.net改变对应checkbox.checked = true的picturebox的背景颜色
- javascript - v-for json 中的切片对象 - Vue 警告:TypeError:无法读取未定义的属性“切片”
- python - 如何将以下代码转换为 python 代码,用于校验和。请有人可以给我一些方向来学习一些东西来实现这一目标
- c# - 如何在方法中添加运算符增量(++)、减量(--)和或(||):AddTokenRule
- angular - 如何仅在 Angular 5 应用程序中启用提交时所需的文本字段?
- powerbi - Python Visual 脚本中的 Power BI 转储/打印变量
- sql-server - 如果员工的薪水高达 5000 则排名 =1,高达 100k 则排名 = 2,依此类推。我们可以使用 rank 函数得到结果吗?