首页 > 解决方案 > 带有 ReactiveMongoRepository 的 Spring WebFlux:不通过流获取数据库更新

问题描述

我面临的问题:

无论我通过各种教程尝试使用 Spring Reactive (WebFlux) REST API,我都无法让它工作。当我最初调用我的端点时,我能够从 MongoDB 集合中获取结果。但是,每当我对文档条目进行更新或添加新文档时,它都不会通过文本事件流进行更新。每次我必须再次调用端点以获得新结果。

设置:

目前我有以下设置:

我正在使用 Spring Webflux、Spring Cloud Gateway 和 Spring ReactiveMongoRepository。

依赖项包括 Spring Boot 主服务的 pom.xml:

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-security</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.security</groupId>
            <artifactId>spring-security-oauth2-jose</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
        </dependency>

ReactiveMongoRepository 的代码:

@Repository
public interface TestRepository extends ReactiveMongoRepository<TestIntegration, String> {

    @Query(("{'userId': ?0}"))
    Flux<TestIntegration> findbyUserId(String userId);
}

其余控制器的代码:

@GetMapping(value = "main/integrations", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<TestIntegration> retrieveIntegrations(ServerWebExchange exchange) {
        return testRepository.findAll();
}

根据我读过的每个教程/指南,这应该按照这种方法工作。有没有人也经历过这种情况或者可能能够帮助解决这个问题?目前卡在这几天...

标签: javaspring-bootspring-webfluxspring-cloud-gateway

解决方案


这按预期工作。当您从 a 流式传输项目时,只要流中Flux<T>有项目,它就会馈送。然后流将关闭。因此,在您的情况下,它从数据库中获取所有数据,将其流式传输给您,然后关闭。

如果您希望保持流打开,则需要继续发送数据。一种方法是:keep alive使用 . 发送消息(以逗号开头的消息)ServerSentsEvents您可以在官方 mozilla 文档中阅读有关这些类型的消息的ServerSentEvents更多信息。colon operator

当您能够实际保持流打开并希望发送数据时,您的服务将不知道何时将新数据写入数据库。因此,要么轮询数据库,要么在写入内容时触发事件,以获取新写入的数据并将其放入流中。

如何将数据放置在连续的开放流中是一个太大的话题,无法在这里解释。但我建议您阅读官方 reactor 文档中的以下部分:

以编程方式创建序列

处理器和接收器


推荐阅读