java - 当我使用来自 ReplaySubject 的 Observable 时阻止 ChannelHandlerContext
问题描述
我正在编写客户端-服务器应用程序。我正在从数据库中获取数据并将其放入 rxjava2 的 ReplaySubject (ReplaySubject 是必要的,因为我需要保证每个客户端上的数据相同)当客户端连接订阅它时我想将此数据发送给他但是当我尝试它时我的头“可能的方式^_^”它阻塞了。通过块,我的意思是它不发送数据,但是当我关闭服务器时,数据会立即显示在客户端。
我尝试在客户端和服务器端事件循环中添加一些线程(我想可能是线程阻塞,因为我使用“无限”源,所以要接收这个我需要另一个线程或类似的东西)。
服务器端通道代码:
public
class ClientHandler
extends SimpleChannelInboundHandler<DataWrapper> {
private final Observable<DataWrapper> data;
public ClientHandler(Observable<DataWrapper> data) {
this.data = data;
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
// super.channelRegistered(ctx);
final Channel channel = ctx.channel();
Server
.INSTANCE
.appendToChannelGroup(channel);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// super.channelActive(ctx);
// i believe there is something wrong
data.subscribe(ctx::writeAndFlush);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
// rest skip
}
客户端:
public
class DirectNetworkCommunicator
extends SimpleChannelInboundHandler<DataWrapper> {
private Observable<DataWrapper> generatedData;
private ExecutorService fallbackThread;
DirectNetworkCommunicator(Observable<DataWrapper> generatedData) {
this.fallbackThread = Executors.newSingleThreadExecutor();
this.generatedData = generatedData;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// super.channelRead(ctx, msg);
DataWrapper inComingData = (DataWrapper) msg;
Adapter
.INSTANCE
.appendFromNettworkData(inComingData);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// super.channelReadComplete(ctx);
ctx.flush();
}
// rest skip
}
所以我之前提到过我希望它在服务器关闭时接收数据而不是接收数据^_^。如果那会帮助 netty 版本 4.1.37 最终版。
解决方案
Ok, so future peoples face the same problem i found answer on my own. Netty from client side use background thread as main for communication witch means I've wait for main thread to release before it can make operation on observable. Hope it helps someone.
推荐阅读
- java - LocalTime() 两次之间的差异
- c++ - 向用户询问指定数量的字符?
- php - 带日期的 CRUD(创建和读取)
- java - Java 应用程序无法在 Mac OSX Sierra 上启动
- python - 从表格中提取文本
- git - 在 master 中维护带有补丁的 fork
- javascript - 重新定义 javascript 类/对象构造函数以分配全局变量
- node.js - 无需等待连接“打开”事件即可导出 Mongoose 模型
- javascript - 从 PHP 到 Javascript 的正则表达式转换问题
- gdb - GDB - 在 gdb 和 OCD Deamon 之间建立通信