首页 > 解决方案 > Spring Webflux 端点作为主题工作

问题描述

我有一个 Flux 端点,我提供给客户(订阅者)以接收更新的价格。我正在测试它通过浏览器访问 URL (http://localhost:8080/prices),它工作正常。我面临的问题(我可能在这里遗漏了一些概念)是当我在许多浏览器中打开此 URL 时,我希望在所有浏览器中都收到通知,但只有一个收到。它作为队列而不是主题工作(如在消息代理中)。这是正确的行为吗?

@GetMapping(value = "prices", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<Collection<Price>>> prices() {
return Flux.interval(Duration.ofSeconds(5))
        .map(sec -> pricesQueue.get())
        .filter(prices -> !prices.isEmpty())
        .map(prices -> ServerSentEvent.<Collection<Price>> builder()
                .event("status-changed")
                .data(prices)
                .build());
}

标签: javaspring-bootspring-webfluxreactive

解决方案


get不是标准的队列操作,但这几乎可以肯定是因为您的pricesQueue.get()方法不是幂等的。pricesQueue.get()对于每个请求(在这种情况下,您打开的每个浏览器窗口),您都会得到一个每 5 秒调用一次的新通量。现在,如果pricesQueue.get()只是检索队列中的最新项目并且什么都不做,一切都很好 - 您的所有订阅者都会收到相同的项目,并显示相同的项目。但是,如果它更像是poll()在检索队列后删除队列中的项目,那么只有第一个通量会获得该值 - 其余的不会,因为到那时它将被删除。

您在这里实际上有两个主要选择:

  • 更改您的get()实现(或实现一个新方法),以便它不会改变队列,只检索一个值。
  • 将助焊剂变成热助焊剂。存储Flux.interval(Duration.ofSeconds(5)).map(sec -> pricesQueue.get()).publish().autoConnect()在某处作为字段(假设为queueFlux),然后queueFlux.filter(prices -> !prices.isEmpty()).map(...)在您的控制器方法中返回。

推荐阅读