首页 > 解决方案 > Netty:TCP客户端服务器文件传输:异常TooLongFrameException:

问题描述

我是 netty 的新手,我正在尝试设计一个如下解决方案,用于通过 TCP 将文件从服务器传输到客户端:

1. Zero copy based file transfer in case of non-ssl based transfer (Using default region of the file)
2. ChunkedFile transfer in case of SSL based transfer.

客户端 - 服务器文件传输以这种方式工作:

1. The client sends the location of the file to be transfered
2. Based on the location (sent by the client) the server transfers the file to the client

文件内容可以是任何内容(字符串 /image /pdf 等)和任何大小。

现在,我得到了这个TooLongFrameException: 在服务器端,虽然服务器只是解码从客户端接收到的路径,以运行下面提到的代码(服务器/客户端)。

io.netty.handler.codec.TooLongFrameException: Adjusted frame length exceeds 65536: 215542494061 - discarded
    at io.netty.handler.codec.LengthFieldBasedFrameDecoder.fail(LengthFieldBasedFrameDecoder.java:522)
    at io.netty.handler.codec.LengthFieldBasedFrameDecoder.failIfNecessary(LengthFieldBasedFrameDecoder.java:500)

现在,我的问题是:

  1. 我对编码器和解码器的顺序及其配置有误吗?如果是这样,将其配置为从服务器接收文件的正确方法是什么?
  2. 我浏览了一些相关的 StackOverflow 帖子SO Q1SO Q2SO Q3SO Q4。我了解了LengthFieldBasedDecoder,但我不知道如何在服务器(编码端)配置其对应的LengthFieldPrepender。它甚至是必需的吗?

请指出我正确的方向。

文件客户端:

public final class FileClient {

    static final boolean SSL = System.getProperty("ssl") != null;
    static final int PORT = Integer.parseInt(System.getProperty("port", SSL ? "8992" : "8023"));
    static final String HOST = System.getProperty("host", "127.0.0.1");

    public static void main(String[] args) throws Exception {
        // Configure SSL.
        final SslContext sslCtx;
        if (SSL) {
            SelfSignedCertificate ssc = new SelfSignedCertificate();
            sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
        } else {
            sslCtx = null;
        }

        // Configure the client
        EventLoopGroup group = new NioEventLoopGroup();

        try {

            Bootstrap b = new Bootstrap();
            b.group(group)
            .channel(NioSocketChannel.class)
            .option(ChannelOption.SO_KEEPALIVE, true)
            .handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ChannelPipeline pipeline = ch.pipeline();
                    if (sslCtx != null) {
                        pipeline.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT));
                    }
                    pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(64*1024, 0, 8));
                    pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));
                    pipeline.addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(null)));
                    pipeline.addLast(new ObjectEncoder());
                    pipeline.addLast( new FileClientHandler());                }
             });


            // Start the server.
            ChannelFuture f = b.connect(HOST,PORT).sync();

            // Wait until the server socket is closed.
            f.channel().closeFuture().sync();
        } finally {
            // Shut down all event loops to terminate all threads.
            group.shutdownGracefully();
        }
    }
}

文件客户端处理程序:

public class FileClientHandler extends ChannelInboundHandlerAdapter{

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        String filePath = "/Users/Home/Documents/Data.pdf";
        ctx.writeAndFlush(Unpooled.wrappedBuffer(filePath.getBytes()));
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            System.out.println("File Client Handler Read method...");
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();

    }
}

文件服务器:

/**
 * Server that accept the path of a file and echo back its content.
 */
public final class FileServer {

    static final boolean SSL = System.getProperty("ssl") != null;
    static final int PORT = Integer.parseInt(System.getProperty("port", SSL ? "8992" : "8023"));

    public static void main(String[] args) throws Exception {
        // Configure SSL.
        final SslContext sslCtx;
        if (SSL) {
            SelfSignedCertificate ssc = new SelfSignedCertificate();
            sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
        } else {
            sslCtx = null;
        }

        // Configure the server.
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_KEEPALIVE, true).handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            if (sslCtx != null) {
                                pipeline.addLast(sslCtx.newHandler(ch.alloc()));
                            }
                            pipeline.addLast("frameDecoder",new LengthFieldBasedFrameDecoder(64*1024, 0, 8));
                            pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));
                            pipeline.addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(null)));
                            pipeline.addLast(new ObjectEncoder());

                            pipeline.addLast(new ChunkedWriteHandler());
                            pipeline.addLast(new FileServerHandler());
                        }
                    });

            // Start the server.
            ChannelFuture f = b.bind(PORT).sync();

            // Wait until the server socket is closed.
            f.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

文件服务器处理程序:

public class FileServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object obj) throws Exception {
        RandomAccessFile raf = null;
        long length = -1;
        try {
            ByteBuf buff = (ByteBuf)obj;

            byte[] bytes = new byte[buff.readableBytes()];
            buff.readBytes(bytes);

            String msg = new String(bytes);

            raf = new RandomAccessFile(msg, "r");
            length = raf.length();
        } catch (Exception e) {
            ctx.writeAndFlush("ERR: " + e.getClass().getSimpleName() + ": " + e.getMessage() + '\n');
            return;
        } finally {
            if (length < 0 && raf != null) {
                raf.close();
            }
        }

        if (ctx.pipeline().get(SslHandler.class) == null) {
            // SSL not enabled - can use zero-copy file transfer.
            ctx.writeAndFlush(new DefaultFileRegion(raf.getChannel(), 0, length));
        } else {
            // SSL enabled - cannot use zero-copy file transfer.
            ctx.writeAndFlush(new ChunkedFile(raf));
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        System.out.println("Exception server.....");
    }
}

我从这里引用了Netty In Action和代码示例

标签: tcpnettyfile-transfer

解决方案


您的服务器/客户端有很多问题。首先是 SSL,对于客户端,您不需要为服务器初始化 SslContext,而是您可以执行以下操作:

sslCtx = SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build();

在服务器端,您使用 aSelfSignedCertificate本身并没有错,但想提醒您,它只能用于调试目的,而不应用于生产。此外ChannelOption.SO_KEEPALIVE,不推荐使用 which,因为 keepalive 间隔取决于操作系统。此外,您添加Object En-/Decoder到您的管道中,在您的情况下不会做任何有用的事情,因此您可以删除它们。

由于参数列表LengthFieldBasedFrameDecoder不完整且错误,您还配置了错误。在netty 文档中,您需要定义and的构造函数版本。除了不剥离长度字段之外,您还定义了错误,它应该与您的4 字节相同。总之,您可以像这样使用构造函数:lengthFieldLengthinitialBytesToStriplengthFieldLengthLengthFieldPrependerlengthFieldLength

new LengthFieldBasedFrameDecoder(64 * 1024, 0, 4, 0, 4)

Charset在您的两个处理程序中,您在编码/解码时都没有指定String可能会导致问题的时间,因为如果没有定义“字符集”,则将使用系统默认值,这可能会有所不同。你可以这样做:

//to encode the String
string.getBytes(StandardCharsets.UTF_8);

//to decode the String
new String(bytes, StandardCharsets.UTF_8);

此外,您尝试使用DefaultFileRegionif no SslHandlerwas added to the pipeline,如果您没有添加,那会很好,LengthFieldHandler因为他们需要 byte[] 的内存副本来发送以添加长度字段。此外,我建议使用 theChunkedNioFile而不是 the,ChunkedFile因为它是非阻塞的,这总是一件好事。你会这样做:

new ChunkedNioFile(randomAccessFile.getChannel())

关于如何解码a 的最后一件事,ChunkedFile因为它被分成块,您可以简单地使用简单的 OutputStream 将它们组合在一起。这是我的一个旧文件处理程序:

public class FileTransferHandler extends SimpleChannelInboundHandler<ByteBuf> {

    private final Path path;
    private final int size;
    private final int hash;

    private OutputStream outputStream;
    private int writtenBytes = 0;
    private byte[] buffer = new byte[0];

    protected FileTransferHandler(Path path, int size, int hash) {
        this.path = path;
        this.size = size;
        this.hash = hash;
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf byteBuf) throws Exception {
        if(this.outputStream == null) {
            Files.createDirectories(this.path.getParent());
            if(Files.exists(this.path))
                Files.delete(this.path);
            this.outputStream = Files.newOutputStream(this.path, StandardOpenOption.CREATE, StandardOpenOption.APPEND);
        }

        int size = byteBuf.readableBytes();
        if(size > this.buffer.length)
            this.buffer = new byte[size];
        byteBuf.readBytes(this.buffer, 0, size);

        this.outputStream.write(this.buffer, 0, size);
        this.writtenBytes += size;

        if(this.writtenBytes == this.size && MurMur3.hash(this.path) != this.hash) {
            System.err.println("Received file has wrong hash");
            return;
        }
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        if(this.outputStream != null)
            this.outputStream.close();
    }
}

推荐阅读