首页 > 解决方案 > 当我使用来自 ReplaySubject 的 Observable 时阻止 ChannelHandlerContext

问题描述

我正在编写客户端-服务器应用程序。我正在从数据库中获取数据并将其放入 rxjava2 的 ReplaySubject (ReplaySubject 是必要的,因为我需要保证每个客户端上的数据相同)当客户端连接订阅它时我想将此数据发送给他但是当我尝试它时我的头“可能的方式^_^”它阻塞了。通过块,我的意思是它不发送数据,但是当我关闭服务器时,数据会立即显示在客户端。

我尝试在客户端和服务器端事件循环中添加一些线程(我想可能是线程阻塞,因为我使用“无限”源,所以要接收这个我需要另一个线程或类似的东西)。

服务器端通道代码:

public
    class ClientHandler
        extends SimpleChannelInboundHandler<DataWrapper> {


    private final Observable<DataWrapper> data;

    public ClientHandler(Observable<DataWrapper> data) {
        this.data = data;
    }


    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        // super.channelRegistered(ctx);
        final Channel channel = ctx.channel();
        Server
            .INSTANCE
            .appendToChannelGroup(channel);

    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // super.channelActive(ctx);
        // i believe there is something wrong
        data.subscribe(ctx::writeAndFlush);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }
    // rest skip
}

客户端:

public
    class DirectNetworkCommunicator
        extends SimpleChannelInboundHandler<DataWrapper> {


    private Observable<DataWrapper> generatedData;
    private ExecutorService fallbackThread;


    DirectNetworkCommunicator(Observable<DataWrapper> generatedData) {
        this.fallbackThread = Executors.newSingleThreadExecutor();
        this.generatedData = generatedData;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // super.channelRead(ctx, msg);
        DataWrapper inComingData = (DataWrapper) msg;
        Adapter
            .INSTANCE
            .appendFromNettworkData(inComingData);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        // super.channelReadComplete(ctx);
        ctx.flush();
    }
    // rest skip
}

所以我之前提到过我希望它在服务器关闭时接收数据而不是接收数据^_^。如果那会帮助 netty 版本 4.1.37 最终版。

标签: javanetworkingnettyrx-java2

解决方案


Ok, so future peoples face the same problem i found answer on my own. Netty from client side use background thread as main for communication witch means I've wait for main thread to release before it can make operation on observable. Hope it helps someone.


推荐阅读