spring-webflux - Spring WebFlux Reactive MongoDB - 如何组合两个更改流?
问题描述
在我的应用程序中,我想组合 2 个更改流来侦听对某些过滤器对象的更改以及对实际有问题的集合的插入/删除(我将驻留在不同集合中的过滤器应用于该集合)
因此我有一个这样的近似代码:
public Flux<List<Object>> findAll(String userId){
<some code here>
return service.findByUserId(userId).
< here i get initial set of data when change streams do not act >
.concatWith(reactiveMongoTemplate
.changeStream("Filter",options,Filter.class)
< here i listen to changes to the filter >
)
.concatWith(reactiveMongoTemplate
.changeStream("Object",options,Object.class)
< here i listen to changes to the collection itself >
);
}
问题是 - 第二个更改流不起作用。这我可以通过切换它们来判断——因为我切换了更高的concatWith
作品,而下面的作品则没有。
问题
你有遇到过这样的行为吗?
有什么更好的方法可以对 2+ 个集合进行一些复杂的侦听以将这些更改传递给 UI?
更新
这是我拥有的全部方法 - 一个 changeStream 工作,另一个不工作:
public Flux<List<EventDTO>> findAll(String userId){
Aggregation fluxAggregation = changeStreamHelper.createAggregationBasedOnUserId(userId);
ChangeStreamOptions options = changeStreamHelper.createChangeStreamOpts(fluxAggregation);
return eventListFilterService.findEventListFilterByUserId(userId).flatMap(fltr -> Mono.just(activeEventsFilter.applyCriterion(null, fltr))
.flatMap(criteria -> eventFilteredRepository.findEventsByCriteria(criteria)
.flatMap(eventMapper::toDto).collectList()))
.concatWith(reactiveMongoTemplate
.changeStream("EventListFilter", options, EventListFilter.class).map(ChangeStreamEvent::getBody)
.map(fltr -> activeEventsFilter.applyCriterion(null, fltr))
.flatMap(criteria -> eventFilteredRepository.findEventsByCriteria(criteria)
.flatMap(eventMapper::toDto).collectList())
)
.concatWith(reactiveMongoTemplate
.changeStream("Event", changeStreamHelper.createSimpleChangeStreamOpts(), Event.class).map(ChangeStreamEvent::getBody)
.flatMap(event -> eventListFilterService.findEventListFilterByUserId(userId).flatMap(fltr -> Mono.just(activeEventsFilter.applyCriterion(null, fltr))
.flatMap(criteria -> eventFilteredRepository.findEventsByCriteria(criteria)
.flatMap(eventMapper::toDto).collectList())
)));
}
解决方案
推荐阅读
- bison - 野牛:我的语法中的移位减少冲突
- python-3.x - 我已经下载了 python 3.8.5,但我看到的仍然是 3.7.4 版本。为什么?
- android - AlarmManager 未触发通知
- javascript - 打字稿:遍历复选框(由ngfor制作)并打印每个复选框的值
- java - PostMapping 逻辑错误
- swift - SwiftUI 隐藏 UIKit UINavigationController(rootViewController: _) 的导航栏
- flutter - 制作元素之间没有间隙的网格视图
- java - 通过 Wi-Fi Direct 发送的图像已损坏
- xml - 通过 XML 解析器推送数据是否会根据本规范执行验证?
- sql-server - 为什么删除行在 SQL Server 中似乎很慢?