首页 > 解决方案 > Netty 4.1:使用 ContinuationWebSocketFrame 发送多个 WebSocketFrame Fragments 和 Closer

问题描述

我有一个订阅者,它在下一个可用有效负载上接收顺序回调,并在收到最后一个有效负载时接收后续回调。(下面的简化代码)

我想要做的是在每个有效负载回调上发送 WebSocketFrame 片段,并在订阅者完成时关闭整个逻辑帧。

我努力了:

  1. 以 a 开头,以 anew TextWebSocketFrame(false, 0, buf)结束new TextWebSocketFrame(true, 0, buf)
  2. 以 a 开始,以 anew ContinuationWebSocketFrame(false, 0, buf)继续new TextWebSocketFrame(false, 0, buf)并以 a 结束new TextWebSocketFrame(true, 0, buf)
  3. 以 a 开始,以 anew ContinuationWebSocketFrame(false, 0, buf)继续new TextWebSocketFrame(false, 0, buf)并以 a 结束new ContinuationWebSocketFrame(true, 0, buf)

.... 以及更多组合,但我的 Chrome 浏览器客户端(版本 76.0.3809.100(官方构建)(64 位))要么报告完全违反 WebSock 协议,要么抱怨已收到多个 ContinuationWebSocketFrames。

您能否指出正确的方向,以正确配置的 WebSocketFrames 的正确顺序使其正常工作?

非常感谢。

static class DataReceiver implements Subscriber<Object> {
    private Channel channel;
    private final AtomicBoolean firstFrame = new AtomicBoolean(false);

    public DataReceiver(Channel channel) {
        this.channel = channel;
    }

    @Override
    public void onSubscribe(Subscription s) {
        s.request(Long.MAX_VALUE);          
    }

    @Override
    public void onNext(Object obj) {
        ByteBuf buf = JSONOps.serialize(obj, channel.alloc().ioBuffer()); 
        if (firstFrame.compareAndSet(false, true)) {
            channel.writeAndFlush(new ContinuationWebSocketFrame(false, 0, buf));
        } else {
            channel.writeAndFlush(new TextWebSocketFrame(false, 0, buf));
        }

    }

    @Override
    public void onError(Throwable t) {
        log.error("Subscriber Error", t);
        // TODO: Handle error
    }

    @Override
    public void onComplete() {
//          channel.writeAndFlush(new ContinuationWebSocketFrame(true, 0, Unpooled.EMPTY_BUFFER));
        channel.writeAndFlush(new TextWebSocketFrame(true, 0, Unpooled.EMPTY_BUFFER));

    }       
}

标签: javawebsocketnetty

解决方案


它现在正在工作。有2个问题。一个是我的管道中没有WebSocketFrameAggregator。其次,关闭的ContinuationWebSocketFrame必须有一些内容。当我使用Unpooled.EMPTY_BUFFER关闭聚合时,它不起作用。

事实证明,无论如何我都需要在最后插入一个 JSON 数组。

工作(简化类)如下所示:

static class DataReceiver implements Subscriber<Object> {
    private Channel channel;
    private final AtomicBoolean firstFrame = new AtomicBoolean(false);

    public DataReceiver(Channel channel) {
        this.channel = channel;
    }

    @Override
    public void onSubscribe(Subscription s) {
        s.request(Long.MAX_VALUE);          
    }

    @Override
    public void onNext(Object obj) {
        ByteBuf buf = JSONOps.serialize(obj, channel.alloc().ioBuffer(4196));
        if (firstFrame.compareAndSet(false, true)) {
            ByteBuf b = Unpooled.wrappedBuffer(Unpooled.wrappedBuffer("[".getBytes()), buf);
            channel.writeAndFlush(new TextWebSocketFrame(false, 0, b));
        } else {
            ByteBuf b = Unpooled.wrappedBuffer(Unpooled.wrappedBuffer(",".getBytes()), buf);
            channel.writeAndFlush(new ContinuationWebSocketFrame(false, 0, b));
        }       
    }

    @Override
    public void onError(Throwable t) {
        log.error("Subscriber Error", t);
        // TODO: Handle error
    }

    @Override
    public void onComplete() {
        channel.writeAndFlush(new ContinuationWebSocketFrame(true, 0, Unpooled.wrappedBuffer("]".getBytes())));
    }       
}

推荐阅读