首页 > 解决方案 > 如何获取服务器端事件 Flux 中的记录总数

问题描述

我正在使用响应式编程,其中客户端接收到服务器端事件的事件流,然后消耗这些事件。功能明智的工作。当我尝试记录通量流中的总记录数时遇到问题。下面是代码片段。

让我们创建一个连接到服务器的实例

        final WebClient client = WebClient
            .builder()
            .baseUrl(url)
            .build();

然后我们开始连接,订阅它的主题

        final Flux<ServerSentEvent<SomeEvent>> eventStream = client.get()
            .uri("/bus/sse?id=" + subscription)
            .accept(MediaType.TEXT_EVENT_STREAM)
            .exchange()
            .flatMapMany(response -> response.bodyToFlux(type))
            .repeat();

        log(eventStream, "connectSSE");

        eventStream
            .doOnError(throwable -> this.onError(throwable, eventStream))
            .doOnComplete(() -> this.onComplete(eventStream));            

        subscribe = eventStream.subscribe(someServerSideEvent -> this.onEvent(someServerSideEvent , eventStream));

下面的方法处理事件

  private void onEvent(final ServerSentEvent<SomeEvent> content, Flux<ServerSentEvent<SomeEvent>> eventStream) {
    log(eventStream, "onEvent");
    //Code for handling event
}

我对下面的代码有疑问。实际上我想记录流中的记录数,我期待它会打印一些数字,但它会打印如下所示的内容。需要一些不使用 .block() 的解决方案。欢迎任何帮助。

“此 Flux 中的计数值:MonoMapFuseable,调用者 onEvent”

private void log1(Flux<ServerSentEvent<SomeEvent>> eventStream, final String caller) {
    try {
        eventStream.count().map(count -> {
            LOGGER.info("Counted values in this Flux: {}, caller {}", count.longValue(), caller);
            return count;
        });
    } catch (final Exception e) {
        LOGGER.info("Counted values in this Flux failed", e);
    }
}

标签: monoflux

解决方案


推荐阅读