spring-webflux - Spring WebFlux(Flux):如何动态发布
问题描述
我是响应式编程和 Spring WebFlux 的新手。我想让我的 App 1 通过 Flux 发布 Server Sent 事件,我的 App 2 连续监听它。
我希望 Flux 按需发布(例如,当发生某些事情时)。我发现的所有示例都是使用 Flux.interval 定期发布事件,一旦创建,似乎无法在 Flux 中附加/修改内容。
我怎样才能实现我的目标?或者我在概念上完全错误。
解决方案
FluxProcessor
使用和“动态”发布FluxSink
手动向 提供数据的技术之一Flux
是使用FluxProcessor#sink
以下示例中的方法
@SpringBootApplication
@RestController
public class DemoApplication {
final FluxProcessor processor;
final FluxSink sink;
final AtomicLong counter;
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
public DemoApplication() {
this.processor = DirectProcessor.create().serialize();
this.sink = processor.sink();
this.counter = new AtomicLong();
}
@GetMapping("/send")
public void test() {
sink.next("Hello World #" + counter.getAndIncrement());
}
@RequestMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent> sse() {
return processor.map(e -> ServerSentEvent.builder(e).build());
}
}
在这里,我创建DirectProcessor
了为了支持多个订阅者,它将监听数据流。此外,我还提供了额外FluxProcessor#serialize
的内容,为多生产者提供安全支持(从不同线程调用而不违反 Reactive Streams 规范规则,尤其是规则 1.3)。最后,通过调用“http://localhost:8080/send”我们将看到消息Hello World #1
(当然,只有在你之前连接到“http://localhost:8080”的情况下)
Reactor 3.4 更新
使用 Reactor 3.4,您有一个名为reactor.core.publisher.Sinks
. Sinks
API 为手动数据发送提供了一个流利的构建器,它允许您指定诸如流中的元素数量和背压行为、支持的订阅者数量和重播能力等内容:
@SpringBootApplication
@RestController
public class DemoApplication {
final Sinks.Many sink;
final AtomicLong counter;
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
public DemoApplication() {
this.sink = Sinks.many().multicast().onBackpressureBuffer();
this.counter = new AtomicLong();
}
@GetMapping("/send")
public void test() {
EmitResult result = sink.tryEmitNext("Hello World #" + counter.getAndIncrement());
if (result.isFailure()) {
// do something here, since emission failed
}
}
@RequestMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent> sse() {
return sink.asFlux().map(e -> ServerSentEvent.builder(e).build());
}
}
注意,通过 API 发送消息Sinks
引入了一个新的概念emission
及其结果。这种 API 的原因是 Reactor 扩展了 Reactive-Streams 并且必须遵循背压控制。也就是说,如果您emit
的信号比请求的多,并且底层实现不支持缓冲,则您的消息将不会被传递。因此,tryEmitNext
返回的结果EmitResult
表明消息是否已发送。
另外,请注意,默认情况下Sinsk
API 提供了 的序列化版本Sink
,这意味着您不必关心并发性。但是,如果您事先知道消息的发送是串行的,您可以构建一个Sinks.unsafe()
不序列化给定消息的版本
推荐阅读
- google-apps-script - 将触发器移动到电子表格的副本
- javascript - 当应用程序处于非活动状态或在反应原生android中关闭时,如何获取用户当前位置的经纬度点?
- php - 如何计算每个数组的结果
- python - 随机选择字符串中元素的百分比并更改值
- html - 使用 Kanna Swift 仅解析 HTML 中的纯文本
- slack - Slack - 更改消息提醒的周开始日
- javascript - 在构造函数中访问道具的正确方法是什么?
- hex - 使用wireshark从UDP十六进制转储中获取jpg
- anaconda - (111) 在 windows server 2012 上运行的 jupyter notebook 中拒绝连接到 localhost
- python - Python:从外部()调用内部()