首页 > 解决方案 > Spring WebFlux Reactive MongoDB - 如何组合两个更改流?

问题描述

在我的应用程序中,我想组合 2 个更改流来侦听对某些过滤器对象的更改以及对实际有问题的集合的插入/删除(我将驻留在不同集合中的过滤器应用于该集合)

因此我有一个这样的近似代码:

public Flux<List<Object>> findAll(String userId){
        <some code here>
        return service.findByUserId(userId).
            
            < here i get initial set of data when change streams do not act >

            .concatWith(reactiveMongoTemplate
              .changeStream("Filter",options,Filter.class)

            < here i listen to changes to the filter >

            )
            .concatWith(reactiveMongoTemplate
              .changeStream("Object",options,Object.class)

            < here i listen to changes to the collection itself >

            );
    }

问题是 - 第二个更改流不起作用。这我可以通过切换它们来判断——因为我切换了更高的concatWith作品,而下面的作品则没有。

问题

你有遇到过这样的行为吗?

有什么更好的方法可以对 2+ 个集合进行一些复杂的侦听以将这些更改传递给 UI?

更新

这是我拥有的全部方法 - 一个 changeStream 工作,另一个不工作:

public Flux<List<EventDTO>> findAll(String userId){
        Aggregation fluxAggregation = changeStreamHelper.createAggregationBasedOnUserId(userId);
        ChangeStreamOptions options = changeStreamHelper.createChangeStreamOpts(fluxAggregation);
        return eventListFilterService.findEventListFilterByUserId(userId).flatMap(fltr -> Mono.just(activeEventsFilter.applyCriterion(null, fltr))
            .flatMap(criteria -> eventFilteredRepository.findEventsByCriteria(criteria)
                .flatMap(eventMapper::toDto).collectList()))
            .concatWith(reactiveMongoTemplate
                .changeStream("EventListFilter", options, EventListFilter.class).map(ChangeStreamEvent::getBody)
                .map(fltr -> activeEventsFilter.applyCriterion(null, fltr))
                .flatMap(criteria -> eventFilteredRepository.findEventsByCriteria(criteria)
                    .flatMap(eventMapper::toDto).collectList())
            )
            .concatWith(reactiveMongoTemplate
                .changeStream("Event", changeStreamHelper.createSimpleChangeStreamOpts(), Event.class).map(ChangeStreamEvent::getBody)
                .flatMap(event -> eventListFilterService.findEventListFilterByUserId(userId).flatMap(fltr -> Mono.just(activeEventsFilter.applyCriterion(null, fltr))
                    .flatMap(criteria -> eventFilteredRepository.findEventsByCriteria(criteria)
                        .flatMap(eventMapper::toDto).collectList())
            )));
    }

标签: spring-webfluxchangestreamspring-data-mongodb-reactive

解决方案


推荐阅读