java - 将一个 websocket 客户端的输出作为输入发送到另一个
问题描述
在 SO 上的快速搜索未能找到我类似的问题,所以我们开始吧
我基本上想要带有 Webflux 的 RSocket 的 requestChannel 语法,所以我能够在 WebSocketClient.execute() 方法之外处理接收到的 Flux 并编写类似这样的东西(只有在订阅返回的通量时才打开会话,正确的错误传播,自动完成并在入站和出站通量都完成时关闭 WS 会话 - 由服务器端完成或由消费者取消)
service /f 将其接收到的字符串消息包装在 'f(...)' 中:'str' -> 'f(str)'
service /g 对 'g(...)' 执行相同的操作,并且以下测试通过:
private final DataBufferFactory dataBufferFactory = new DefaultDataBufferFactory();
private WebSocketMessage serializeString(final String text) {
return new WebSocketMessage(Type.TEXT, dataBufferFactory.wrap(text.getBytes(StandardCharsets.UTF_8)));
}
@Test
void test() {
var requests = 5;
var input = Flux.range(0, requests).map(String::valueOf);
var wsClient = new ReactorNettyWebSocketClient(
HttpClient.from(TcpClient.create(ConnectionProvider.newConnection())));
var f = requestChannel(wsClient, fUri, input.map(this::serializeString))
.map(WebSocketMessage::getPayloadAsText);
var g = requestChannel(wsClient, gUri, f.map(this::serializeString))
.map(WebSocketMessage::getPayloadAsText);
var responses = g.take(requests);
var expectedResponses = Stream.range(0, requests)
.map(i -> "g(f(" + i + "))")
.toJavaArray(String[]::new);
StepVerifier.create(responses)
.expectSubscription()
.expectNext(expectedResponses)
.verifyComplete();
}
解决方案
这似乎对我有用......到目前为止
public static Flux<WebSocketMessage> requestChannel(
WebSocketClient wsClient, URI uri, Flux<WebSocketMessage> outbound) {
CompletableFuture<Flux<WebSocketMessage>> recvFuture = new CompletableFuture<>();
CompletableFuture<Integer> consumerDoneCallback = new CompletableFuture<>();
var executeMono = wsClient.execute(uri,
wss -> {
recvFuture.complete(wss.receive().log("requestChannel.receive " + uri, Level.FINE));
return wss.send(outbound)
.and(Mono.fromFuture(consumerDoneCallback));
}).log("requestChannel.execute " + uri, Level.FINE);
return Mono.fromFuture(recvFuture)
.flatMapMany(recv -> recv.doOnComplete(() -> consumerDoneCallback.complete(1)))
.mergeWith(executeMono.cast(WebSocketMessage.class));
}
如果我还没有偶然发现这个解决方案有任何缺陷,我会很感兴趣
推荐阅读
- openssl - OCSP 响应符号算法
- python - 如何在多个 Flask 应用之间共享 JWT 访问令牌?
- javascript - 编辑单元格时的 ReactDataGrid 强制关闭编辑模式
- c++ - 枚举类变量可以取整范围的整数值吗?
- reactjs - 在任何页面中重新加载时如何正确重新获取数据?
- python - xgettext 无法提取标签属性
- c# - 如何将 c# 应用程序连接到 ElasticMQ 容器?
- c# - 更新父实体时,EF Core 正在删除子实体
- postgresql - 如何在 PostgreSQL 中获取计数派生列的总和?
- oracle - ORA-08006: 指定的行不再存在