java - Netty 4.1:使用 ContinuationWebSocketFrame 发送多个 WebSocketFrame Fragments 和 Closer
问题描述
我有一个订阅者,它在下一个可用有效负载上接收顺序回调,并在收到最后一个有效负载时接收后续回调。(下面的简化代码)
我想要做的是在每个有效负载回调上发送 WebSocketFrame 片段,并在订阅者完成时关闭整个逻辑帧。
我努力了:
- 以 a 开头,以 a
new TextWebSocketFrame(false, 0, buf)
结束new TextWebSocketFrame(true, 0, buf)
。 - 以 a 开始,以 a
new ContinuationWebSocketFrame(false, 0, buf)
继续new TextWebSocketFrame(false, 0, buf)
并以 a 结束new TextWebSocketFrame(true, 0, buf)
。 - 以 a 开始,以 a
new ContinuationWebSocketFrame(false, 0, buf)
继续new TextWebSocketFrame(false, 0, buf)
并以 a 结束new ContinuationWebSocketFrame(true, 0, buf)
。
.... 以及更多组合,但我的 Chrome 浏览器客户端(版本 76.0.3809.100(官方构建)(64 位))要么报告完全违反 WebSock 协议,要么抱怨已收到多个 ContinuationWebSocketFrames。
您能否指出正确的方向,以正确配置的 WebSocketFrames 的正确顺序使其正常工作?
非常感谢。
static class DataReceiver implements Subscriber<Object> {
private Channel channel;
private final AtomicBoolean firstFrame = new AtomicBoolean(false);
public DataReceiver(Channel channel) {
this.channel = channel;
}
@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(Object obj) {
ByteBuf buf = JSONOps.serialize(obj, channel.alloc().ioBuffer());
if (firstFrame.compareAndSet(false, true)) {
channel.writeAndFlush(new ContinuationWebSocketFrame(false, 0, buf));
} else {
channel.writeAndFlush(new TextWebSocketFrame(false, 0, buf));
}
}
@Override
public void onError(Throwable t) {
log.error("Subscriber Error", t);
// TODO: Handle error
}
@Override
public void onComplete() {
// channel.writeAndFlush(new ContinuationWebSocketFrame(true, 0, Unpooled.EMPTY_BUFFER));
channel.writeAndFlush(new TextWebSocketFrame(true, 0, Unpooled.EMPTY_BUFFER));
}
}
解决方案
它现在正在工作。有2个问题。一个是我的管道中没有WebSocketFrameAggregator。其次,关闭的ContinuationWebSocketFrame必须有一些内容。当我使用Unpooled.EMPTY_BUFFER关闭聚合时,它不起作用。
事实证明,无论如何我都需要在最后插入一个 JSON 数组。
工作(简化类)如下所示:
static class DataReceiver implements Subscriber<Object> {
private Channel channel;
private final AtomicBoolean firstFrame = new AtomicBoolean(false);
public DataReceiver(Channel channel) {
this.channel = channel;
}
@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(Object obj) {
ByteBuf buf = JSONOps.serialize(obj, channel.alloc().ioBuffer(4196));
if (firstFrame.compareAndSet(false, true)) {
ByteBuf b = Unpooled.wrappedBuffer(Unpooled.wrappedBuffer("[".getBytes()), buf);
channel.writeAndFlush(new TextWebSocketFrame(false, 0, b));
} else {
ByteBuf b = Unpooled.wrappedBuffer(Unpooled.wrappedBuffer(",".getBytes()), buf);
channel.writeAndFlush(new ContinuationWebSocketFrame(false, 0, b));
}
}
@Override
public void onError(Throwable t) {
log.error("Subscriber Error", t);
// TODO: Handle error
}
@Override
public void onComplete() {
channel.writeAndFlush(new ContinuationWebSocketFrame(true, 0, Unpooled.wrappedBuffer("]".getBytes())));
}
}
推荐阅读
- .net - Web API 客户端请求未到达 API 的可能原因有哪些
- rx-java2 - RxJava count() 当两个 observable 之一没有完成时
- javascript - SVG 仅在 IE 中覆盖整个画布
- ionic-framework - 如何将 font-awsome 添加到 Ionic 4 项目?
- android - 代码未在应用程序的 onCreate 上执行 - Kotlin/Android
- gradle - gradle ant 排除多个文件
- android - FCM Firebase 推送通知 Android/iOS
- cakephp - CakePHP 3.6 - 在外壳/命令中触发事件
- javascript - 工具提示没有换行
- recursion - 汉诺塔递归解解释