project-reactor - Reactor Netty - 如何使用延迟通量发送
问题描述
在 Reactor Netty 中,当通过 向 TCP 通道发送数据时out.send(publisher)
,人们会期望任何发布者都能正常工作。但是,如果我们使用更复杂的带有延迟元素的而不是简单的立即Flux
数,那么它就会停止正常工作。例如,如果我们使用这个 hello world TCP 回显服务器,它会按预期工作:
import reactor.core.publisher.Flux;
import reactor.netty.DisposableServer;
import reactor.netty.tcp.TcpServer;
import java.time.Duration;
public class Reactor1 {
public static void main(String[] args) throws Exception {
DisposableServer server = TcpServer.create()
.port(3344)
.handle((in, out) -> in
.receive()
.asString()
.flatMap(s ->
out.sendString(Flux.just(s.toUpperCase()))
))
.bind()
.block();
server.channel().closeFuture().sync();
}
}
但是,如果我们更改out.sendString
为
out.sendString(Flux.just(s.toUpperCase()).delayElements(Duration.ofSeconds(1)))
那么我们期望对于每个接收到的项目都会产生一个延迟一秒的输出。
但是,服务器的行为方式是,如果它在间隔期间接收到多个项目,它将只为第一个项目生成输出。例如,下面我们在第一秒输入aa
and ,但只生成输出(一秒后):bb
AA
$ nc localhost 3344
aa
bb
AA <after one second>
然后,如果我们稍后键入额外的行,我们会得到输出(一秒钟后),但来自前一个输入:
cc
BB <after one second>
有什么想法可以send()
延迟按预期工作Flux
吗?
解决方案
我认为您不应该为out.sendString(...)
This works 重新创建发布者:
DisposableServer server = TcpServer.create()
.port(3344)
.handle((in, out) -> out
.options(NettyPipeline.SendOptions::flushOnEach)
.sendString(in.receive()
.asString()
.map(String::toUpperCase)
.delayElements(Duration.ofSeconds(1))))
.bind()
.block();
server.channel().closeFuture().sync();
推荐阅读
- r - 在 R 中更改 date.format
- php - 未捕获的错误:在带有 Apache 2.4、PHP 7.4 的 Windows 10 上找不到类“mysqli”
- geospatial - Pysal 问题模块没有属性“打开”
- html - 将 html 作为模板变量传递 django
- javascript - DataTables:初始化后如何设置配置?
- php - WooCommerce 价格显示前缀?
- javascript - 在for循环中获取变量是未定义的错误
- python - 如何让我的阶乘函数更快更高效?
- c - 在C中读取wav文件的左右通道?
- css - 如何使我的侧边栏仅适合外部 div