首页 > 解决方案 > 来自基于套接字的通信的两个 Netty 套接字应用程序(部分消息)之间的通信问题

问题描述

我在两个 Netty 套接字应用程序(主应用程序和集成测试应用程序)之间进行测试时遇到了通信问题,我收到了异常数量的部分消息。

一个值得注意的模式是,从测试应用程序发送的第一条消息(应用程序执行从管道外部发送消息,使用可共享的处理程序)往往总是部分的。在发生另一个问题的延迟期间也会注意到这一点。

另一个问题是,当有时接收到部分消息时,解码器似乎陷入了一个循环,它继续尝试无限期地读取部分消息。我有一个单元测试来模拟部分消息EmbeddedChannel,但单元测试没有复制我在集成测试期间看到的内容。

主应用程序使用以下管道:

ch.pipeline().addLast(<HeaderTrailerFrameDecoder>, <NettyMessageDecoder>, <HeaderTrailerFrameEncoder>, <NettyMessageEncoder>);
ch.pipeline().addLast(<IdleStateHandler>,<EventHandler>, <ChannelHandler>);

在哪里:

我认为问题可能出在所有编码器/解码器上,也许没有释放我的 ByteBuf 对象?我只看到第一个解码器的问题HeaderTrailerFrameDecoder,所以我将在下面提供一个片段。对于每个连接,来自测试应用程序发送的一系列消息的第一条消息首先生成 log msg="could not find trailer"

@Slf4j // Lombok logging
public class HeaderTrailerFrameDecoder extends ByteToMessageDecoder {

    private final byte header;
    private final byte trailer;

    HeaderTrailerFrameDecoder(byte header, byte trailer) {
        this.header = header;
        this.trailer = trailer;
    }

    @Override
    protected void decode(
            final ChannelHandlerContext ctx,
            final ByteBuf buf,
            final List<Object> out) {
        log.trace("msg=\"decoding message with header and trailer\", buf={}", buf);

        // Find header
        int headerIndex = buf.forEachByte(value -> value != header);
        if (headerIndex < 0) {
            log.error("msg=\"could not find header\", payload=\"{}\"", payload);
            buf.skipBytes(buf.readableBytes());
            return;
        }
        int beforeHeaderLen = headerIndex - buf.readerIndex();
        buf.skipBytes(beforeHeaderLen + 1);


        // Find trailer
        int trailerIndex = buf.forEachByte(value -> value != trailer);
        if (trailerIndex < 0) {
            String payload = debug(buf);
            log.error("msg=\"could not find trailer\"");
            buf.resetReaderIndex();
            return;
        }
        int insideFrameLen = trailerIndex - buf.readerIndex();
        ByteBuf frame = buf.readBytes(insideFrameLen);
        buf.skipBytes(1);

        // Pass message through
        out.add(frame);
    }
}

标签: javanetty

解决方案


推荐阅读