首页 > 解决方案 > 将一个 websocket 客户端的输出作为输入发送到另一个

问题描述

在 SO 上的快速搜索未能找到我类似的问题,所以我们开始吧

我基本上想要带有 Webflux 的 RSocket 的 requestChannel 语法,所以我能够在 WebSocketClient.execute() 方法之外处理接收到的 Flux 并编写类似这样的东西(只有在订阅返回的通量时才打开会话,正确的错误传播,自动完成并在入站和出站通量都完成时关闭 WS 会话 - 由服务器端完成或由消费者取消)

service /f 将其接收到的字符串消息包装在 'f(...)' 中:'str' -> 'f(str)'

service /g 对 'g(...)' 执行相同的操作,并且以下测试通过:

    private final DataBufferFactory dataBufferFactory = new DefaultDataBufferFactory();

    private WebSocketMessage serializeString(final String text) {
        return new WebSocketMessage(Type.TEXT, dataBufferFactory.wrap(text.getBytes(StandardCharsets.UTF_8)));
    }

    @Test
    void test() {
        var requests = 5;
        var input = Flux.range(0, requests).map(String::valueOf);

        var wsClient = new ReactorNettyWebSocketClient(
            HttpClient.from(TcpClient.create(ConnectionProvider.newConnection())));

        var f = requestChannel(wsClient, fUri, input.map(this::serializeString))
            .map(WebSocketMessage::getPayloadAsText);

        var g = requestChannel(wsClient, gUri, f.map(this::serializeString))
            .map(WebSocketMessage::getPayloadAsText);

        var responses = g.take(requests);

        var expectedResponses = Stream.range(0, requests)
                                           .map(i -> "g(f(" + i + "))")
                                           .toJavaArray(String[]::new);

        StepVerifier.create(responses)
                    .expectSubscription()
                    .expectNext(expectedResponses)
                    .verifyComplete();
    }

标签: javaspring-webfluxproject-reactor

解决方案


这似乎对我有用......到目前为止

    public static Flux<WebSocketMessage> requestChannel(
        WebSocketClient wsClient, URI uri, Flux<WebSocketMessage> outbound) {

        CompletableFuture<Flux<WebSocketMessage>> recvFuture = new CompletableFuture<>();
        CompletableFuture<Integer> consumerDoneCallback = new CompletableFuture<>();

        var executeMono = wsClient.execute(uri,
            wss -> {
                recvFuture.complete(wss.receive().log("requestChannel.receive " + uri, Level.FINE));
                return wss.send(outbound)
                          .and(Mono.fromFuture(consumerDoneCallback));
            }).log("requestChannel.execute " + uri, Level.FINE);

        return Mono.fromFuture(recvFuture)
                   .flatMapMany(recv -> recv.doOnComplete(() -> consumerDoneCallback.complete(1)))
                   .mergeWith(executeMono.cast(WebSocketMessage.class));
    }

如果我还没有偶然发现这个解决方案有任何缺陷,我会很感兴趣


推荐阅读