mongodb - 如何访问 MongoDB 支持的 Spring Integration 消息存储中存储的消息?
问题描述
我实现了一个QueueChannel
由MongoDbChannelMessageStore
. 消息生产和我使用消息的集成流程都按预期工作。
现在我正在尝试实现一个逻辑,该逻辑列出并记录当前包含在消息存储或队列通道中的所有消息。消息应与它们的 POJO 有效负载(toString()
格式)一起记录。消息不应通过列出来从队列通道中删除。该逻辑应该在应用程序启动期间或按需调用。
这是一些代码片段(我使用的是 Spring Boot 2.3.4)。
我的消息有效负载:
@Data
public class ExampleMessage implements Serializable {
private String id;
private Instant timestamp;
}
我的集成配置:
@SpringBootApplication
@EnableIntegration
@Slf4j
public class IntegrationApp {
private static final String GROUP_ID = "my-group";
// ... main method omitted
@Bean
public MongoDbChannelMessageStore mongoDbChannelMessageStore(MongoDatabaseFactory mongoDatabaseFactory) {
return new MongoDbChannelMessageStore(mongoDatabaseFactory, "message-store");
}
@Bean
public PollableChannel channel(MongoDbChannelMessageStore messageStore) {
MessageGroupQueue messageGroupQueue = new MessageGroupQueue(messageStore, GROUP_ID);
return new QueueChannel(messageGroupQueue);
}
@Bean
public IntegrationFlow integrationFlow(PollableChannel channel) {
return IntegrationFlows.from(channel)
.handle(message -> log.info("Message received: {}", message.getPayload()),
e -> e.poller(Pollers
.fixedRate(7000, 5000)
.maxMessagesPerPoll(1)
.taskExecutor(Executors.newSingleThreadExecutor())))
.get();
}
我的消息制作人:
@Component
@EnableScheduling
@Slf4j
public class ExampleMessageProducer {
@Autowired
private PollableChannel channel;
private final MessagingTemplate messagingTemplate = new MessagingTemplate();
@Scheduled(initialDelay = 1000, fixedDelay = 3000)
void produceMessage() {
ExampleMessage exampleMessage = new ExampleMessage();
messagingTemplate.send(channel, MessageBuilder.withPayload(exampleMessage).build());
log.info("Message sent: {}", exampleMessage);
}
}
从 Spring Integration 消息传递的公共 API 中,可以针对该问题派生以下方法:
@Component
@Slf4j
public class EventListenerBean {
@Autowired
MongoDbChannelMessageStore messageStore;
@EventListener
public void onApplicationEvent(ContextRefreshedEvent event) {
Collection<Message<?>> messages = messageStore.getMessageGroup(GROUP_ID).getMessages();
log.info("# of messages in group: {}", messages.size());
messages.forEach(m -> log.info("Stored message: {}", m.getPayload()));
}
}
不幸的是,这种方法会导致一个空的集合或消息流(可能是由于消息存储实现中的某种延迟加载?)。
2020-11-03 17:05:42.398 INFO 4748 --- [ main] m.t.s.integration.EventListenerBean : # of messages in group: 0
2020-11-03 17:05:42.420 INFO 4748 --- [ main] m.t.spring.integration.IntegrationApp : Started IntegrationApp in 6.557 seconds (JVM running for 7.532)
2020-11-03 17:05:47.533 INFO 4748 --- [pool-1-thread-1] m.t.spring.integration.IntegrationApp : Message received: ExampleMessage(id=d803b591-2f47-412d-9d64-e8efb424f393, timestamp=2020-11-03T15:43:34.253162Z)
2020-11-03 17:05:48.345 INFO 4748 --- [pool-1-thread-1] m.t.spring.integration.IntegrationApp : Message received: ExampleMessage(id=7361b7d6-5c36-4801-851e-6d61dc18ebb2, timestamp=2020-11-03T15:43:36.259450500Z)
2020-11-03 17:05:49.348 INFO 4748 --- [pool-1-thread-1] m.t.spring.integration.IntegrationApp : Message received: ExampleMessage(id=7fe98beb-bd0b-4fda-a2c2-bbab2d80d1e9, timestamp=2020-11-03T15:43:38.265175Z)
2020-11-03 17:05:50.347 INFO 4748 --- [pool-1-thread-1] m.t.spring.integration.IntegrationApp : Message received: ExampleMessage(id=375d92b6-8746-4478-911a-85b34c2ec2ab, timestamp=2020-11-03T15:43:40.270516100Z)
我想避免message-store
直接查询 MongoDB 集合,而是更喜欢使用 Spring Integration API。
有没有人可以解决这个问题?先感谢您。
解决方案
如果你对具体的感兴趣MessageGroup
,可以使用合约的getMessageGroup()
API 。BasicMessageGroupStore
无论哪种方式,在 bean 初始化阶段访问像数据库这样的低级资源都是不好的,就像在channel
bean 定义中处理日志一样。您必须推迟这样的操作,直到整个应用程序上下文准备好。或抓住一个ContextRefreshedEvent
,或执行一个SmartLifecycle.start()
合同。
更新
事实证明,您MongoDbChannelMessageStore
对它的 API 是正确的。我们绝对无法访问该合约中的消息:
/**
* Not fully used. Only wraps the provided group id.
*/
@Override
public MessageGroup getMessageGroup(Object groupId) {
return getMessageGroupFactory().create(groupId);
}
所以,我们只是创建一个新的空组,并且没有任何与 MongoDB 集合中的任何内容挂钩。
作为一种解决方法,我建议您将常规ConfigurableMongoDbMessageStore
作为 bean 针对同一个message-store
集合。这个已经为我们提供了一个 API 来迭代组及其消息。因此,您将用于对其MongoDbChannelMessageStore
进行主动操作QueueChannel
并定期ConfigurableMongoDbMessageStore
用于读取集合的内容。
我们可能需要考虑在ChannelMessageStore
. 看起来没有什么害处……请随意提出 GH 问题来进行改进!
推荐阅读
- angular - angular7上带有图像主体的Http请求
- react-native - 检查是否已设置导航状态参数对象
- python - 如何获得干净的 YouTube 成绩单
- scala - scala spark 使用 expr 在列内赋值
- tinymce - TinyMCE 中 MCE 的完整形式是什么?
- google-chrome-devtools - Chrome DevTools - SQL 查询未执行
- spring - 下载文件并提供服务(Spring WebClient -> Liferay Porlet.serveResource)
- python - 迭代多维数组并搜索形成正方形的点
- pic - 无法执行中断功能
- java - 我正在尝试从 json 正文中获取输入,并且想要验证所有输入是否正确(如果不是通过异常)