首页 > 解决方案 > 合并通量的 Spring Webflux 类型安全性

问题描述

我已经通过使用 Project Reactor 设置从服务器到客户端的单向通信来实现服务器发送事件。

在服务器上,我添加了一个字符串类型的通量连接:

Flux<String> addDataConnection() {
        return Flux.create(emitter -> {
            fluxSinks.add(emitter);
        });
    }

每当我收到消息时,它就会传播消息:

void propagateMessage(String messageContent) {
        fluxSinks.parallelStream()
                .forEach(fluxSink -> fluxSink.next(messageContent));
    }

此数据来自外部来源,因此传入事件的频率超出了我的控制范围。

因此,我添加了一个额外的心跳流,我将它与我的数据连接流合并。这样,我既可以保持连接活动,也可以检测终止的连接,例如当浏览器窗口关闭时:

Flux.interval(Duration duration)实际上返回一个 Long,但我将它映射到一个 String 以匹配数据连接的类型安全。这个字符串实际上并没有被任何人使用

Flux<String> addHeartbeat() {
    return Flux.interval(Duration.ofSeconds(2))
            .map(i -> "heartbeat");
}

我的问题是关于助焊剂的元素类型。因为我现在在我的数据连接上返回字符串,所以我可以简单地从心跳返回一个字符串,因此Flux<String>从通量的合并中返回一个:

Flux<String> initiateFluxStreams() {
        return Flux.merge(addDataConnection(), addHeartbeat());
    }

但是,如果我的数据流返回一个更复杂的对象并且我想确保通量合并方法中的类型安全怎么办?我总是可以返回一个类型Flux<?>来让我的编译器满意,但我对这个解决方案并不满意,因为它允许我(或其他人)返回他们想要的任何东西。通量的合并是否支持更详细的编译安全性?

编辑:我在https://stackoverflow.com/a/49082329/3432964看到了答案,以及它如何<? extends someMarkerInterface>用于该类型。这是我唯一的选择吗?将整个事物包装在一个对象中会更好ServerSentEvent吗?

标签: javaspring-webfluxproject-reactor

解决方案


对于不同类型的合并,没有“神奇”的方法可以做到这一点Flux,因为您无法保证这些类型将具有任何共同的祖先(除了 Object,但正如您指出的那样,这并不太有用。)

如果您的类型是Foo,您可以有一个“特殊”实例(或子类),Foo仅表示为空的心跳类型,但我并不特别喜欢这种方法 - 它可能会牺牲设计,只是为了在其中撬开某种类型.

我的首选方法(注意,在其他地方没有看到这个,所以我不能声称这是一种“常用方法”)是使用 a Flux<Optional<Foo>>,其中“心跳”触发器只是空的选项。就像是:

Flux<Foo> addDataConnection() {
    return Flux.create(emitter -> {
        fluxSinks.add(emitter);
    });
}

Flux<Optional<Foo>> initiateFluxStreams() {
    return Flux.merge(addDataConnection().map(f -> Optional.of(f)), addHeartbeat().map(h -> Optional.empty()));
}

Flux<Long> addHeartbeat() {
    return Flux.interval(Duration.ofSeconds(2));
}

然后,您可以Flux在处理之前过滤和映射结果以清除心跳:

fluxOfOptionalFoo
        .filter(Optional::isPresent)
        .map(Optional::get);

出于多种原因,我更喜欢这种方法:

  • 不需要对底层对象结构进行任何更改
  • 更具描述性的是,您有一种Flux“可能存在或不存在”的类型
  • 容易过滤掉空的选项(或者如果你想用心跳功能做一些特别的事情)

如果您愿意,如果您认为 optional 不够“描述性”,您当然可以实现自己的“心跳”泛型类,但我个人不会说这是必要的。


推荐阅读