mono - 如何获取服务器端事件 Flux 中的记录总数
问题描述
我正在使用响应式编程,其中客户端接收到服务器端事件的事件流,然后消耗这些事件。功能明智的工作。当我尝试记录通量流中的总记录数时遇到问题。下面是代码片段。
让我们创建一个连接到服务器的实例
final WebClient client = WebClient
.builder()
.baseUrl(url)
.build();
然后我们开始连接,订阅它的主题
final Flux<ServerSentEvent<SomeEvent>> eventStream = client.get()
.uri("/bus/sse?id=" + subscription)
.accept(MediaType.TEXT_EVENT_STREAM)
.exchange()
.flatMapMany(response -> response.bodyToFlux(type))
.repeat();
log(eventStream, "connectSSE");
eventStream
.doOnError(throwable -> this.onError(throwable, eventStream))
.doOnComplete(() -> this.onComplete(eventStream));
subscribe = eventStream.subscribe(someServerSideEvent -> this.onEvent(someServerSideEvent , eventStream));
下面的方法处理事件
private void onEvent(final ServerSentEvent<SomeEvent> content, Flux<ServerSentEvent<SomeEvent>> eventStream) {
log(eventStream, "onEvent");
//Code for handling event
}
我对下面的代码有疑问。实际上我想记录流中的记录数,我期待它会打印一些数字,但它会打印如下所示的内容。需要一些不使用 .block() 的解决方案。欢迎任何帮助。
“此 Flux 中的计数值:MonoMapFuseable,调用者 onEvent”
private void log1(Flux<ServerSentEvent<SomeEvent>> eventStream, final String caller) {
try {
eventStream.count().map(count -> {
LOGGER.info("Counted values in this Flux: {}, caller {}", count.longValue(), caller);
return count;
});
} catch (final Exception e) {
LOGGER.info("Counted values in this Flux failed", e);
}
}
解决方案
推荐阅读
- swift - a和b之和减去c是多少?
- python - 斌 zc.buildout
- python - 合并数据框熊猫中的列
- android - 启用 androidX 会导致“解决后无法更改配置策略 ':app:compile'”错误
- c++ - 返回 STRING 的 MySQL UDF 与数据重叠
- google-cloud-platform - 通过 api 调用访问 google Cloud IAM 策略所需的范围列表
- git - PowerShell git push 作为计划任务
- python - 按顺序将浮点值映射到元组
- linux - find 命令重命名每个文件和目录。最大深度 1
- javascript - 打开一个新窗口并关闭另一个