首页 > 解决方案 > 如何使用 Spring Integration 和 MongoDB 实现消息队列?

问题描述

如何配置 Spring Integration,以便从集合中删除已处理的消息。在 MongoDB 控制台中,我可以简单地调用:

db.messages.findAndModify({ remove:true })

但在 MongoDbMessageSource 中只读取消息

mongoTemplate.find(..)

我想它可以通过在事务中进行一些删除来完成。但我无法想出简单的好解决方案。

我的配置的入站部分:

@Bean
@Autowired
public IntegrationFlow pollMessages(MongoDbFactory mongoDbFactory, SomeService someService) {
    return IntegrationFlows.from(
            mongoMessageSource(mongoDbFactory),
            c -> c.poller(Pollers.fixedDelay(1, TimeUnit.SECONDS)))
            .handle(someService, "process")
            .get();
}

@Bean
@Autowired
public MongoDbMessageSource mongoMessageSource(MongoDbFactory mongo) {
    MongoDbMessageSource messageSource = new MongoDbMessageSource(mongo, new LiteralExpression("{}"));
    messageSource.setEntityClass(MessageEntity.class);
    messageSource.setCollectionNameExpression(new LiteralExpression("messages"));

    return messageSource;
}

标签: javamongodbspring-integrationmessage-queuespring-data-mongodb

解决方案


这是正确的。要实现这样的要求,您需要查看:

/**
 * Specify the {@link TransactionSynchronizationFactory} to attach a
 * {@link org.springframework.transaction.support.TransactionSynchronization}
 * to the transaction around {@code poll} operation.
 * @param transactionSynchronizationFactory the TransactionSynchronizationFactory to use.
 * @return the spec.
 */
public PollerSpec transactionSynchronizationFactory(
        TransactionSynchronizationFactory transactionSynchronizationFactory) {

并真正从TransactionSynchronizationProcessor.processAfterCommit().

有关详细信息,请参阅参考手册

对于 XML 配置,我们有这个测试用例:

<int-mongodb:inbound-channel-adapter id="inboundAdapterWithOnSuccessDisposition"
                                     channel="replyChannel"
                                     query="{'name' : 'Bob'}"
                                     auto-startup="false">

    <int:poller fixed-delay="200" max-messages-per-poll="1">
        <int:advice-chain  synchronization-factory="syncFactory">
            <bean
                    class="org.springframework.integration.mongodb.config.MongoDbInboundChannelAdapterIntegrationTests.TestMessageSourceAdvice" />
            <tx:advice>
                <tx:attributes>
                    <tx:method name="*" />
                </tx:attributes>
            </tx:advice>
        </int:advice-chain>
    </int:poller>
</int-mongodb:inbound-channel-adapter>

<int:transaction-synchronization-factory id="syncFactory">
    <int:before-commit expression="@documentCleaner.remove(#mongoTemplate, payload, headers.mongo_collectionName)"/>
</int:transaction-synchronization-factory>

<bean id="transactionManager" class="org.springframework.integration.transaction.PseudoTransactionManager" />

Java DSL 也可以做类似的事情。

你需要一个DefaultTransactionSynchronizationFactoryandExpressionEvaluatingTransactionSynchronizationProcessor来配置这件事。对的,同样PseudoTransactionManager可以使用。

虽然您也可以考虑remove/update在流程结束时手动调用。


推荐阅读