java - 如何使用 Spring Integration 和 MongoDB 实现消息队列?
问题描述
如何配置 Spring Integration,以便从集合中删除已处理的消息。在 MongoDB 控制台中,我可以简单地调用:
db.messages.findAndModify({ remove:true })
但在 MongoDbMessageSource 中只读取消息
mongoTemplate.find(..)
我想它可以通过在事务中进行一些删除来完成。但我无法想出简单的好解决方案。
我的配置的入站部分:
@Bean
@Autowired
public IntegrationFlow pollMessages(MongoDbFactory mongoDbFactory, SomeService someService) {
return IntegrationFlows.from(
mongoMessageSource(mongoDbFactory),
c -> c.poller(Pollers.fixedDelay(1, TimeUnit.SECONDS)))
.handle(someService, "process")
.get();
}
@Bean
@Autowired
public MongoDbMessageSource mongoMessageSource(MongoDbFactory mongo) {
MongoDbMessageSource messageSource = new MongoDbMessageSource(mongo, new LiteralExpression("{}"));
messageSource.setEntityClass(MessageEntity.class);
messageSource.setCollectionNameExpression(new LiteralExpression("messages"));
return messageSource;
}
解决方案
这是正确的。要实现这样的要求,您需要查看:
/**
* Specify the {@link TransactionSynchronizationFactory} to attach a
* {@link org.springframework.transaction.support.TransactionSynchronization}
* to the transaction around {@code poll} operation.
* @param transactionSynchronizationFactory the TransactionSynchronizationFactory to use.
* @return the spec.
*/
public PollerSpec transactionSynchronizationFactory(
TransactionSynchronizationFactory transactionSynchronizationFactory) {
并真正从TransactionSynchronizationProcessor.processAfterCommit()
.
有关详细信息,请参阅参考手册。
对于 XML 配置,我们有这个测试用例:
<int-mongodb:inbound-channel-adapter id="inboundAdapterWithOnSuccessDisposition"
channel="replyChannel"
query="{'name' : 'Bob'}"
auto-startup="false">
<int:poller fixed-delay="200" max-messages-per-poll="1">
<int:advice-chain synchronization-factory="syncFactory">
<bean
class="org.springframework.integration.mongodb.config.MongoDbInboundChannelAdapterIntegrationTests.TestMessageSourceAdvice" />
<tx:advice>
<tx:attributes>
<tx:method name="*" />
</tx:attributes>
</tx:advice>
</int:advice-chain>
</int:poller>
</int-mongodb:inbound-channel-adapter>
<int:transaction-synchronization-factory id="syncFactory">
<int:before-commit expression="@documentCleaner.remove(#mongoTemplate, payload, headers.mongo_collectionName)"/>
</int:transaction-synchronization-factory>
<bean id="transactionManager" class="org.springframework.integration.transaction.PseudoTransactionManager" />
Java DSL 也可以做类似的事情。
你需要一个DefaultTransactionSynchronizationFactory
andExpressionEvaluatingTransactionSynchronizationProcessor
来配置这件事。对的,同样PseudoTransactionManager
可以使用。
虽然您也可以考虑remove/update
在流程结束时手动调用。
推荐阅读
- mysql - getaddrinfo ENOTFOUND - 使用 node.js 连接到 mysql 数据库时
- haskell - 如何从 Haskell-Stack 项目的测试文件夹中导入文件?
- python - Nested tree DataFrame reshaping
- node.js - Has anybody found out how to correctly autoindent .ejs in VS Code?
- java - Unable to create a new remote session. Hostname (IP_ADDRESS) not verified
- python-3.x - 在 Airflow 2.0 中读取 yaml 配置文件并创建 DAG 生成器
- c++ - 制造图灵机但无法结束
- plotly - 用 plotly mapbox DensityMapBox 绘制 k_means 簇的密度
- delphi - 有没有办法确定滚动条在 TDBGrid 上是否可见
- php - foreach 循环内的猫头鹰旋转木马