首页 > 解决方案 > 如何访问 MongoDB 支持的 Spring Integration 消息存储中存储的消息?

问题描述

我实现了一个QueueChannelMongoDbChannelMessageStore. 消息生产和我使用消息的集成流程都按预期工作。

现在我正在尝试实现一个逻辑,该逻辑列出并记录当前包含在消息存储或队列通道中的所有消息。消息应与它们的 POJO 有效负载(toString()格式)一起记录。消息不应通过列出来从队列通道中删除。该逻辑应该在应用程序启动期间或按需调用。

这是一些代码片段(我使用的是 Spring Boot 2.3.4)。

我的消息有效负载:

@Data
public class ExampleMessage implements Serializable {
    private String id;
    private Instant timestamp;
}

我的集成配置:

@SpringBootApplication
@EnableIntegration
@Slf4j
public class IntegrationApp {

    private static final String GROUP_ID = "my-group";

    // ... main method omitted

    @Bean
    public MongoDbChannelMessageStore mongoDbChannelMessageStore(MongoDatabaseFactory mongoDatabaseFactory) {
        return new MongoDbChannelMessageStore(mongoDatabaseFactory, "message-store");
    }

    @Bean
    public PollableChannel channel(MongoDbChannelMessageStore messageStore) {
        MessageGroupQueue messageGroupQueue = new MessageGroupQueue(messageStore, GROUP_ID);
        return new QueueChannel(messageGroupQueue);
    }

    @Bean
    public IntegrationFlow integrationFlow(PollableChannel channel) {
        return IntegrationFlows.from(channel)
                .handle(message -> log.info("Message received: {}", message.getPayload()),
                        e -> e.poller(Pollers
                            .fixedRate(7000, 5000)
                            .maxMessagesPerPoll(1)
                            .taskExecutor(Executors.newSingleThreadExecutor())))
                .get();
    }

我的消息制作人:

@Component
@EnableScheduling
@Slf4j
public class ExampleMessageProducer {

    @Autowired
    private PollableChannel channel;

    private final MessagingTemplate messagingTemplate = new MessagingTemplate();

    @Scheduled(initialDelay = 1000, fixedDelay = 3000)
    void produceMessage() {
        ExampleMessage exampleMessage = new ExampleMessage();
        messagingTemplate.send(channel, MessageBuilder.withPayload(exampleMessage).build());
        log.info("Message sent: {}", exampleMessage);
    }
}

从 Spring Integration 消息传递的公共 API 中,可以针对该问题派生以下方法:

@Component
@Slf4j
public class EventListenerBean {

    @Autowired
    MongoDbChannelMessageStore messageStore;

    @EventListener
    public void onApplicationEvent(ContextRefreshedEvent event) {

        Collection<Message<?>> messages = messageStore.getMessageGroup(GROUP_ID).getMessages();
        log.info("# of messages in group: {}", messages.size());
        messages.forEach(m -> log.info("Stored message: {}", m.getPayload()));
    }
}

不幸的是,这种方法会导致一个空的集合或消息流(可能是由于消息存储实现中的某种延迟加载?)。

2020-11-03 17:05:42.398  INFO 4748 --- [           main] m.t.s.integration.EventListenerBean      : # of messages in group: 0
2020-11-03 17:05:42.420  INFO 4748 --- [           main] m.t.spring.integration.IntegrationApp    : Started IntegrationApp in 6.557 seconds (JVM running for 7.532)
2020-11-03 17:05:47.533  INFO 4748 --- [pool-1-thread-1] m.t.spring.integration.IntegrationApp    : Message received: ExampleMessage(id=d803b591-2f47-412d-9d64-e8efb424f393, timestamp=2020-11-03T15:43:34.253162Z)
2020-11-03 17:05:48.345  INFO 4748 --- [pool-1-thread-1] m.t.spring.integration.IntegrationApp    : Message received: ExampleMessage(id=7361b7d6-5c36-4801-851e-6d61dc18ebb2, timestamp=2020-11-03T15:43:36.259450500Z)
2020-11-03 17:05:49.348  INFO 4748 --- [pool-1-thread-1] m.t.spring.integration.IntegrationApp    : Message received: ExampleMessage(id=7fe98beb-bd0b-4fda-a2c2-bbab2d80d1e9, timestamp=2020-11-03T15:43:38.265175Z)
2020-11-03 17:05:50.347  INFO 4748 --- [pool-1-thread-1] m.t.spring.integration.IntegrationApp    : Message received: ExampleMessage(id=375d92b6-8746-4478-911a-85b34c2ec2ab, timestamp=2020-11-03T15:43:40.270516100Z)

我想避免message-store直接查询 MongoDB 集合,而是更喜欢使用 Spring Integration API。

有没有人可以解决这个问题?先感谢您。

标签: mongodbspring-integration

解决方案


如果你对具体的感兴趣MessageGroup,可以使用合约的getMessageGroup()API 。BasicMessageGroupStore

无论哪种方式,在 bean 初始化阶段访问像数据库这样的低级资源都是不好的,就像在channelbean 定义中处理日志一样。您必须推迟这样的操作,直到整个应用程序上下文准备好。或抓住一个ContextRefreshedEvent,或执行一个SmartLifecycle.start()合同。

更新

事实证明,您MongoDbChannelMessageStore对它的 API 是正确的。我们绝对无法访问该合约中的消息:

/**
 * Not fully used. Only wraps the provided group id.
 */
@Override
public MessageGroup getMessageGroup(Object groupId) {
    return getMessageGroupFactory().create(groupId);
}

所以,我们只是创建一个新的空组,并且没有任何与 MongoDB 集合中的任何内容挂钩。

作为一种解决方法,我建议您将常规ConfigurableMongoDbMessageStore作为 bean 针对同一个message-store集合。这个已经为我们提供了一个 API 来迭代组及其消息。因此,您将用于对其MongoDbChannelMessageStore进行主动操作QueueChannel并定期ConfigurableMongoDbMessageStore用于读取集合的内容。

我们可能需要考虑在ChannelMessageStore. 看起来没有什么害处……请随意提出 GH 问题来进行改进!


推荐阅读