java - Spring Webflux 端点作为主题工作
问题描述
我有一个 Flux 端点,我提供给客户(订阅者)以接收更新的价格。我正在测试它通过浏览器访问 URL (http://localhost:8080/prices),它工作正常。我面临的问题(我可能在这里遗漏了一些概念)是当我在许多浏览器中打开此 URL 时,我希望在所有浏览器中都收到通知,但只有一个收到。它作为队列而不是主题工作(如在消息代理中)。这是正确的行为吗?
@GetMapping(value = "prices", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<Collection<Price>>> prices() {
return Flux.interval(Duration.ofSeconds(5))
.map(sec -> pricesQueue.get())
.filter(prices -> !prices.isEmpty())
.map(prices -> ServerSentEvent.<Collection<Price>> builder()
.event("status-changed")
.data(prices)
.build());
}
解决方案
get
不是标准的队列操作,但这几乎可以肯定是因为您的pricesQueue.get()
方法不是幂等的。pricesQueue.get()
对于每个请求(在这种情况下,您打开的每个浏览器窗口),您都会得到一个每 5 秒调用一次的新通量。现在,如果pricesQueue.get()
只是检索队列中的最新项目并且什么都不做,一切都很好 - 您的所有订阅者都会收到相同的项目,并显示相同的项目。但是,如果它更像是poll()
在检索队列后删除队列中的项目,那么只有第一个通量会获得该值 - 其余的不会,因为到那时它将被删除。
您在这里实际上有两个主要选择:
- 更改您的
get()
实现(或实现一个新方法),以便它不会改变队列,只检索一个值。 - 将助焊剂变成热助焊剂。存储
Flux.interval(Duration.ofSeconds(5)).map(sec -> pricesQueue.get()).publish().autoConnect()
在某处作为字段(假设为queueFlux
),然后queueFlux.filter(prices -> !prices.isEmpty()).map(...)
在您的控制器方法中返回。
推荐阅读
- python - 将参数从 html 页面 url 映射传递到 django 中的视图
- r - 使用 data.table 按组抽样而不重复
- html - 如果我将宽度指定为适合内容,如何从页面中的所有 h4 标签中找到 h4 标签的最大宽度?
- c - c tcpclient连接错误,关于socket,并选择
- java - 如何围绕设置器创建通用方法?
- node.js - UniversalWindowsPlatform (Node.js + Xamarin) 中的文本 CL/RF 错误
- eclipse - Eclipse C++:如何跳到第一个断点?
- firebase - 如何检索 Firebase 用户属性?
- css - 如果它前面有一个带有特殊类的 div,则在另一个类中设置一个类的样式
- c# - 从 Azure 发出 BadRequest 的持续推送消息