首页 > 解决方案 > 有没有办法控制在 Reactor Netty 的 TcpClient 中读取的字节数?

问题描述

我正在使用 TcpClient 连接到一个简单的 TCP 回显服务器。消息由 4 个字节的消息大小和消息本身组成。例如,要发送消息“hello”,服务器将期待“0005hello”,并以“0005hello”响应。在负载下测试时(大约 300 多个并发用户),相邻请求有时会导致响应“堆积”,例如发送“0004good”后跟“0003day”可能会导致客户端收到“0004good0003”后跟“day”。

在传统的、非基于 WebFlux 的 TCP 客户端中,通常会从套接字读取前 4 个字节到缓冲区,确定消息 N 的长度,然后从套接字读取后面的 N 个字节到缓冲区,然后返回响应。是否有可能通过使用 TcpClient 的底层 Channel 来实现这种细粒度的控制?

我还考虑过在某些数据结构(队列、StringBuffer 等)中累积响应并让守护进程解析结果的方法,但这在实践中并没有达到预期的性能。

标签: spring-webfluxreactor-netty

解决方案


我通过向LengthFieldBasedFrameDecoderConnection 添加一个类型的处理程序来解决这个问题:

TcpClient.create()
        .host(ADDRESS)
        .port(PORT)
        .doOnConnected((connection) -> {
            connection.addHandler("parseLengthFromFirstFourBytes", new LengthFieldBasedFrameDecoder(9999, 0, 4) {
                @Override
                protected long getUnadjustedFrameLength(ByteBuf buf, int offset, int length, ByteOrder order) {
                    ByteBuf lengthBuffer = buf.copy(0, 4);
                    byte[] messageLengthBytes = new byte[4];
                    lengthBuffer.readBytes(messageLengthBytes);
                    String messageLengthString = new String(messageLengthBytes);
                    return Long.parseLong(messageLengthString);
                }
            });
        })
        .connect()
        .subscribe();

这解决了当应用程序承受足够负载时响应仍然“堆积”(如问题中所述)的警告问题。


推荐阅读