java - 如何创建 Reactor Netty 热流
问题描述
我正在尝试找到一种创建热流的方法,我可以在其中以一种方法插入数据,而订阅者可以通过另一种方法获取数据。我已成功使用 a WorkQueueProcessor
,但我不确定这是否是正确的做法。是否可以使用 Flux.create 做同样的事情?这是我的工作片段:
- 称呼
connect();
向服务器发送字节数据,客户端将收到
tcp
服务器的响应并workQueueProcessor
发出数据。@Component @RequiredArgsConstructor public class TcpCli { @Setter private Connection connection; private NettyOutbound out; //Creation of Work Queue Processor, can a Flux.create here can do the same job ? private WorkQueueProcessor<String> workQueueProcessor = WorkQueueProcessor.<String>builder().build(); public Mono<? extends Connection> connect() { return TcpClient.create() .host(tcpConfig.getHost()) .port(tcpConfig.getPort()) .handle(this::handleConnection) .connect(); } public Mono<String> sendData(ByteArray data) { out.sendByteArray(Mono.just(data)).then().subscribe(); //Get emitted data from workQueueProcessor return workQueueProcessor.next(); } private Publisher<Void> handleConnection(NettyInbound in, NettyOutbound out) { this.out = out; in.receive().asString() .log("In received") .subscribe(str -> { LOGGER.info(String.format("Inbound: %s", str)); //Emit data to workQueueProcessor workQueueProcessor.onNext(str); }); return out .neverComplete() //keep connection alive .log("Never close"); } }
解决方案
你能解释一下@Setter 在这里做了什么吗?我尝试相同的代码,它总是给出
@Setter
private Connection connection;
private NettyOutbound out;
推荐阅读
- lua - 循环和嵌套 if 语句的问题
- sql - 如何总结唯一的返回值
- ios - 核心数据中的默认值
- firebase - 如何在flutter中从firebase auth获取所有用户ID
- css - 无法一次显示一张幻灯片
- c# - 为什么会出现异常进程无法访问文件 image0.gif',因为它正在被另一个进程使用,即使我先处理它?
- javascript - 打印带有至少 4 个辅音的单词 javascripts
- excel - 在 Excel VBA 中,我如何计算总限制不应超过 500 美元的值的总和,然后得到相应的产品组合?
- python - 通过传递脚本名称在 Python 中执行多个脚本
- python - 如何使用另一个系列作为过滤器过滤一个系列中的数据?