project-reactor - Reactor-netty TCPClient 无法接收响应
问题描述
我正在尝试对缺少文档的最新 reactor-netty 版本进行一些项目前体验;我使用的是 0.8.0.M3 版本。
我用这个 tcp 服务器开发了一个简单的 spring boot 应用程序,它可以正确启动并且似乎可以工作:
@PostConstruct
public void startServer() throws InterruptedException {
TcpServer.create().
host("localhost").
port(1235).
handle((in, out) -> {
Flux<String> fluxString = in.receive().asString().log().
map(text -> {
return "Hi server have received "+text;});
return out.sendString(fluxString).then();
}
).
wiretap().bindNow();
}
如果我尝试使用客户端进行测试,交互似乎是正确的,但我无法收到任何响应:
int counter = 10;
CountDownLatch latch = new CountDownLatch(counter);
Flux<String> input = Flux.range(0, counter).map(i->""+i);
TcpClient.create().
host("localhost").
port(1235).
handle((in, out) -> {
in.receive().subscribe(receiv -> {System.out.println(receiv);latch.countDown();});
return out.sendString(input).neverComplete();
}
).
wiretap().connectNow();
System.out.println("waiting closure");
boolean result = latch.await(5, TimeUnit.SECONDS);
查看窃听日志似乎客户端分别将每个 int 作为字符串发送,而服务器仅接收一个聚合字符串“0123456789”并仅发送一个响应。客户端没有收到任何东西,并且锁存器也没有减少 1 并且保持在 10(我希望至少收到一个聚合响应)。
谁能解释客户端出了什么问题以及如何由服务器分别接收每个整数?
谢谢
解决方案
您可能需要解决一些问题。我想说这对于学习来说有点复杂。
对于服务器:
TcpServer.create()
.host("localhost")
.port(1235)
.doOnConnection(c ->
//The origin input are 0,1,2,3,4,5,6,7,8,9.
//So you need a decoder split every 1 byte as a ByteBuf.
c.addHandler(
"1ByteStringDecoder",
new ByteToMessageDecoder() {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
out.add(in.readBytes(1));
}
}
)
)
.handle((in, out) -> {
Flux<String> fluxString = in.receive()
.asString()
.log()
.map(text -> {
return "Hi server have received " + text;
});
//Since the output is quite small, you need flush it
return out.options(o -> o.flushOnEach())
.sendString(fluxString)
.neverComplete();
}
)
.wiretap()
.bindNow();
对于客户:
int counter = 10;
CountDownLatch latch = new CountDownLatch(counter);
startServer();
Flux<String> input = Flux.range(0, counter)
.map(i -> "" + i);
TcpClient.create()
.host("localhost")
.port(1235)
.doOnConnected(c ->
c.addHandler(
//The covert input are "Hi server have received " + (0,1,2,3,4,5,6,7,8,9).
//So you need a decoder split every 25 byte as a ByteBuf.
"25ByteStringDecoder",
new ByteToMessageDecoder() {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
out.add(in.readBytes(25));
}
}
)
)
.handle((in, out) -> {
in.receive()
.asString()//You need convert ByteBuf to String.
.subscribe(receiv -> {
System.out.println(receiv);
latch.countDown();
});
out.options(o -> o.flushOnEach())
.sendString(input)
.then()
.subscribe(); //You need to ask your client to send the data by subscribe
return Mono.never();
}
)
.wiretap()
.connectNow();
System.out.println("waiting closure");
boolean result = latch.await(5, TimeUnit.SECONDS);
推荐阅读
- mysql - 通过 Laravel 存储 3rd 方数据库/API 凭证的安全方式
- laravel - 我想在 cakephp 框架中使用 laravel 密码哈希算法
- c++ - 这个函数期望从 std::istream 解析什么格式的输入?
- c# - 如何清除 ASP.NET Core 中的 MemoryCache?
- github - Markdown 文件的 YAML 标头在文档顶部生成奇数表
- java - 使用Java循环将字符串转换为单词中每个字母的数字unicode等价物?
- .net - Docker - Windows 容器 - 安装 DotNet 框架 472
- java - Firebase 身份验证成功后,如何进入 Android Studio 中的下一个活动?
- apache-spark - 无法使用火花流从 kafka 主题中读取数据
- ansible - 如何在 terraform 代码中调用 ansible playbook?