首页 > 解决方案 > 用于更大消息的本机 websocket api NodeJS?

问题描述

我正在关注一篇关于从头开始编写套接字服务器的文章,它主要使用小帧/包,但是当我尝试发送大约 2kb 的数据时,我收到了这个错误:.

internal/buffer.js:77
  throw new ERR_OUT_OF_RANGE(type || 'offset',
  ^
RangeError [ERR_OUT_OF_RANGE]: The value of "offset" is out of range. It must be >= 0 and <= 7. Receive
d 8
    at boundsError (internal/buffer.js:77:9)
    at Buffer.readUInt8 (internal/buffer.js:243:5)
    at pm (/home/users/me/main.js:277:24)
    at Socket.<anonymous> (/home/users/me/main.js:149:15)
    at Socket.emit (events.js:315:20)
    at addChunk (_stream_readable.js:297:12)
    at readableAddChunk (_stream_readable.js:273:9)
    at Socket.Readable.push (_stream_readable.js:214:10)
    at TCP.onStreamRead (internal/stream_base_commons.js:186:23) {
  code: 'ERR_OUT_OF_RANGE'
}

这是我的服务器代码(为安全起见更改了一些细节,但这里是行号等的全部内容)但这里的相关部分是函数 pm [=parseMessage](朝向底部):

let http = require('http'),
    ch   = require("child_process"),
    crypto = require("crypto"),
    fs = require("fs"),
    password = fs.readFileSync(“./secretPasswordFile.txt”),
    callbacks = {
    
    CHANGEDforSecUrITY(m, cs) {
        if(m.password === password) {
            if(m.command) {
                try {
                    cs.my = ch.exec(
                        m.command,
                        (
                            err,
                            stdout,
                            stderr
                        ) => {
                            cs.write(ans(s({
                                err,
                                stdout,
                                stderr
                            })));
                        }
                    );
                } catch(e) {
                    cs.write(ans(
                        s({
                            error: e.toString()
                        })
                    ))
                }
            }
            if(m.exit) {
                console.log("LOL", cs.my);
                if(cs.my && typeof cs.my.kill === "function") {
                    cs.my.kill();
                    console.log(cs.my, "DID?");
                }
            }
            cs.write(
                ans(
                    s({
                    hi: 2,
                    youSaid:m
                }))
                
            
            )
        } else {
            cs.write(ans(s({
                hey: "wrong password!!"
            })))
        }

        
        console.log("hi!",m)
    }
    },
    banned = [
    "61.19.71.84"
    ],
    server = http.createServer(
    (q,r)=> {
        if(banned.includes(q.connection.remoteAddress)) {
            r.end("Hey man, " + q.connection.remoteAddress, 
                "I know you're there!!");
        } else {
            ch.exec(`sudo "$(which node)" -p "console.log(4)"`)
            console.log(q.url)
            console.log(q.connection.remoteAddress,q.connection.remotePort)        
            let path = q.url.substring(1)
            q.url == "/" && 
                (path = "index.html")
            q.url == "/secret" &&
                (path = "../main.js")
            fs.readFile(
                "./static/" + path,
                (er, f) => {
                    if(er) {
                        r.end("<h2>404!!</h2>");    
            
                    } else {
                        r.end(f);
                    }
                }
            )
        }
    }
    )
server.listen(
    process.env.PORT || 80, 
    c=> {
        console.log(c,"helo!!!")
        server.on("upgrade", (req, socket) => {
            if(req.headers["upgrade"] !== "websocket") {
                socket.end("HTTP/1.1 400 Bad Request");
                return;
            }
            
            let key = req.headers["sec-websocket-key"];
            if(key) {
                let hash = gav(key)
                let headers = [
                    "HTTP/1.1 101 Web Socket Protocol Handshake",
                    "Upgrade: WebSocket",
                    "Connection: Upgrade",
                    `Sec-WebSocket-Accept: ${hash}`
                ];
                let protocol = req.headers[
                    "sec-websocket-protocol"
                ];
                let protocols = (
                    protocol &&
                    protocol.split(",")
                    .map(s => s.trim())
                    || []
                );
                protocols.includes("json") &&
                    headers
                    .push("Sec-WebSocket-Protocol: json");
                let headersStr = (
                    headers.join("\r\n") + 
                    "\r\n\r\n"

                
                )


                console.log(
                    "Stuff happening",
                    req.headers,
                    headersStr
                );
                fs.writeFileSync("static/logs.txt",headersStr);
                socket.write(
                    headersStr
                );


                socket.write(ans(JSON.stringify(
                    {
                        hello: "world!!!"
                    }
                )))

            }
            
            socket.on("data", buf => {
                let msg = pm(buf);
                console.log("HEY MAN!",msg)
                if(msg) {
                    console.log("GOT!",msg);
                    for(let k in msg) {
                        if(callbacks[k]) {
                            callbacks[k](
                                msg[k],
                                socket
                            )
                        }
                    }
                } else {
                    console.log("nope");
                }
            });
        });

    }
)

function pm(buf) {
    /*
     *structure of first byte:
         1: if its the last frame in buffer
         2 - 4: reserved bits
         5 - 8: a number which shows what type of message it is. Chart:

             "0": means we continue
             "1": means this frame contains text
             "2": means this is binary
             "0011"(3) - "0111" (11): reserved values
             "1000"(8): means connection closed
             "1001"(9): ping (checking for response)
             "1010"(10): pong (response verified)
             "1010"(11) - "1111"(15): reserved for "control" frames
     structure of second byte:
        1: is it "masked"
        2 - 8: length of payload, if less than 126.
            if 126, 2 additional bytes are added
            if 127 (or more), 6 additional bytes added (total 8)

     * */
    const myFirstByte = buf.readUInt8(0);

    const isThisFinalFrame = isset(myFirstByte,7) //first bit

    const [
        reserved1,
        reserved2,
        reserved3
    ] = [
        isset(myFirstByte, 6),
        isset(myFirstByte, 5),
        isset(myFirstByte, 4) //reserved bits 
    ]
    
    const opcode = myFirstByte & parseInt("1111",2); //checks last 4 bits

    //check if closed connection ("1000"(8))
    if(opcode == parseInt("1000", 2))
        return null; //shows that connection closed

    //look for text frame ("0001"(1))
    if(opcode == parseInt("0001",2)) {
        const theSecondByte = buf.readUInt8(1);

        const isMasked = isset(theSecondByte, 7) //1st bit from left side
        
        let currentByteOffset = 2; //we are theSecondByte now, so 2

        let payloadLength = theSecondByte & 127; //chcek up to 7 bits

        if(payloadLength > 125) {
            if(payloadLength === 126) {
                payloadLength = buf.readUInt16BE(
                    currentByteOffset
                ) //read next two bytes from position
                currentByteOffset += 2; //now we left off at 
                //the fourth byte, so thats where we are
            
            } else {
                //if only the second byte is full,
                //that shows that there are 6 more 
                //bytes to hold the length 
                const right = buf.readUInt32BE(
                    currentByteOffset
                );
                const left = buf.readUInt32BE(
                    currentByteOffset + 4 //the 8th byte ??
                );

                throw new Error("brutal " + currentByteOffset);

            }
        }

        //if we have masking byte set to 1, get masking key
        //
        //
    

        //now that we have the lengths
        //and possible masks, read the rest 
        //of the bytes, for actual data
        const data = Buffer.alloc(payloadLength); 

        if(isMasked) {
            //can't just copy it,
            //have to do some stuff with
            //the masking key and this thing called
            //"XOR" to the data. Complicated
            //formulas, llook into later
            //
            let maskingBytes = Buffer.allocUnsafe(4);
            buf.copy(
                maskingBytes,
                0,
                currentByteOffset,
                currentByteOffset + 4
            );
            currentByteOffset += 4;
            for(
                let i = 0;
                i < payloadLength;
                ++i
            ) {
 
                const source = buf.readUInt8(
                    currentByteOffset++
                );

                //now mask the source with masking byte
                data.writeUInt8(
                    source ^ maskingBytes[i & 3],
                    i
                );
            }
        } else {
            //just copy bytes directly to our buffer
            buf.copy(
                data,
                0,
                currentByteOffset++
            );
        }

        //at this point we have the actual data, so make a json
        //
        const json = data.toString("utf8");
        return p(json);
    } else {
        return "LOL IDK?!";
    }
}

function p(str) {
    try {
        return JSON.parse(str);
    } catch(e){
        return str
    }
}

function s(ob) {
    try {
        return JSON.stringify(ob);
    } catch(e) {
        return e.toString();
    }
}

function ans(str) {
    const byteLength = Buffer.byteLength(str);

    const lengthByteCount = byteLength < 126 ? 0 : 2;
    const payloadLength = lengthByteCount === 0 ? byteLength : 126;

    const buffer = Buffer.alloc(
        2 +
        lengthByteCount + 
        byteLength
    );

    buffer.writeUInt8(
        parseInt("10000001",2), //opcode is "1", at firstbyte
        0
    );

    buffer.writeUInt8(payloadLength, 1); //at second byte

    let currentByteOffset = 2; //already wrote second byte by now

    if(lengthByteCount > 0) {
        buffer.writeUInt16BE(
            byteLength,
            2 //more length at 3rd byte position
        );
        currentByteOffset += lengthByteCount; //which is 2 more bytes
        //of length, since not supporting more than that
    }

    buffer.write(str, currentByteOffset); //the rest of the bytes
    //are the actual data, see chart in function pm
    //
    return buffer;
}

function gav(ak) {
    return crypto
    .createHash("sha1")
    .update(ak +'258EAFA5-E914-47DA-95CA-C5AB0DC85B11', "binary")
    .digest("base64")
}

function isset(b, k) {
    return !!(
        b >>> k & 1
    )
}

鉴于较小的数据包不会发生此错误,我有根据地猜测这是由于此处代码的限制,如官方 RFC 文档中所述:

5.4. 碎片化

分片的主要目的是允许在消息开始时发送大小未知的消息,而不必缓冲该消息。如果消息不能被分段,那么
端点将不得不缓冲整个消息,以便
在发送第一个字节之前计算其长度。通过分片,
服务器或中介可以选择一个合理大小的缓冲区,当
缓冲区满时,将一个分片写入网络。

分段的第二个用例是多路复用,其中不希望一个逻辑通道上的大消息
独占输出通道,因此多路复用需要自由地将消息拆分为较小的片段以更好地共享输出通道. (请注意,本文档中未描述多路复用扩展。)

除非扩展另有说明,否则帧没有语义意义。
如果客户端和服务器没有协商扩展,或者协商了一些扩展,中介可能会合并和/或拆分帧,
但中介了解所有
协商的扩展,并且知道如何
在存在的情况下合并和/或拆分帧这些扩展。这意味着在没有扩展的情况下,发送者和接收者不能依赖于
特定帧边界的存在。

以下规则适用于分片:

o 未分段的消息由一个设置了 FIN 位(第 5.2 节)的单帧和一个非 0 的操作码组成。

o 分片消息由一个 FIN 位清零和一个非 0 操作码的单帧组成,随后是零个或多个 FIN 位清零且操作码设置为 0 的帧,并由一个设置了 FIN 位的单帧终止并且操作码为 0。分段消息在概念上等同于单个较大消息,其有效负载等于按顺序连接的片段的有效负载;但是,在存在扩展的情况下,这可能不成立,因为扩展定义了“扩展数据”存在的解释。例如,“扩展数据”可能仅出现在第一个片段的开头并应用于后续片段,或者每个片段中可能存在仅适用于该特定片段的“扩展数据”。在缺少...之下 ”

  EXAMPLE: For a text message sent as three fragments, the first
  fragment would have an opcode of 0x1 and a FIN bit clear, the
  second fragment would have an opcode of 0x0 and a FIN bit clear,
  and the third fragment would have an opcode of 0x0 and a FIN bit
  that is set.

o 控制帧(参见第 5.5 节)可以注入到分段消息的中间。控制帧本身不得分段。

o 消息片段必须按照发送者发送的顺序传递给接收者。o 一个消息的片段不得在另一消息的片段之间交织,除非已经协商了可以解释交织的扩展。

o 端点必须能够处理分段消息中间的控制帧。

o 发送者可以为非控制消息创建任意大小的片段。

o 客户端和服务器必须支持接收分段和非分段消息。

o 由于控制帧不能被分段,中介不得试图改变控制帧的分段。

o 如果使用了任何保留的比特值,并且这些值的含义对于中介来说是未知的,那么中介绝不能改变消息的分段。

o 在已经协商扩展并且中介不知道协商扩展的语义的连接上下文中,中介不得更改任何消息的分段。同样,没有看到导致 WebSocket 连接的 WebSocket 握手(并且没有被通知其内容)的中介不得更改此类连接的任何消息的碎片。

o 由于这些规则,消息的所有片段都属于同一类型,由第一个片段的操作码设置。由于控制帧不能被分段,消息中所有片段的类型必须是文本、二进制或保留的操作码之一。

注意:如果不能插入控制帧,例如,如果在大消息后面,ping 的延迟会很长。 因此,需要在分段消息
的中间处理控制帧。

实现说明:在没有任何扩展的情况下,接收者
不必缓冲整个帧来处理它。例如
,如果使用流式 API,则可以将帧的一部分
传递给应用程序。但是,请注意,此假设
可能不适用于所有未来的 WebSocket 扩展。

用上面文章的话来说:

Node.js 套接字缓冲区与 WebSocket 消息帧的对齐

Node.js 套接字数据(在这种情况下我说的是 net.Socket,而不是 WebSockets)以缓冲块的形式接收。这些是分开的,不考虑您的 WebSocket 帧的开始或结束位置!

这意味着如果您的服务器正在接收分段为多个 WebSocket 帧的大型消息,或者快速连续接收大量消息,则无法保证 Node.js 套接字接收到的每个数据缓冲区都与构成给定帧的字节数据。

因此,当您解析套接字接收的每个缓冲区时,您需要跟踪一帧的结束位置和下一帧的开始位置。您需要确保您已收到一帧的所有数据字节,然后才能安全地使用该帧的数据。

可能是一帧在下一帧开始的同一缓冲区中途结束。也可能是一个帧被分割成几个缓冲区,这些缓冲区将被连续接收。

下图是对该问题的夸张说明。在大多数情况下,帧倾向于适合缓冲区。由于数据到达的方式,您经常会发现帧的开始和结束与套接字缓冲区的开始和结束一致。但这不能在所有情况下都依赖,必须在实施过程中加以考虑。在此处输入图像描述这可能需要一些工作才能正确。

对于下面的基本实现,我跳过了用于处理大消息或拆分为多个帧的消息的任何代码。

所以我的问题是这篇文章跳过了分段代码,这是我需要知道的……但是在那篇 RFC 文档中,给出了一些分段和未分段数据包的示例:

5.6. 数据框

数据帧(例如,非控制帧)由操作码
的最高有效位为 0 的操作码标识。当前为数据帧定义的操作码包括 0x1(文本)、0x2(二进制)。操作码0x3-0x7 保留用于尚未 定义
的其他非控制帧。

数据帧携带应用层和/或扩展层数据。操作码决定数据的解释:

文本

  The "Payload data" is text data encoded as UTF-8.  Note that a
  particular text frame might include a partial UTF-8 sequence;
  however, the whole message MUST contain valid UTF-8.  Invalid
  UTF-8 in reassembled messages is handled as described in
  Section 8.1.

二进制

  The "Payload data" is arbitrary binary data whose interpretation
  is solely up to the application layer.

5.7. 例子

o 单帧未屏蔽文本消息

  *  0x81 0x05 0x48 0x65 0x6c 0x6c 0x6f (contains "Hello")

o 单帧屏蔽文本消息

  *  0x81 0x85 0x37 0xfa 0x21 0x3d 0x7f 0x9f 0x4d 0x51 0x58
     (contains "Hello")

o 支离破碎的未屏蔽短信

  *  0x01 0x03 0x48 0x65 0x6c (contains "Hel")

  *  0x80 0x02 0x6c 0x6f (contains "lo")

o 未屏蔽的 Ping 请求和屏蔽的 Ping 响应

  *  0x89 0x05 0x48 0x65 0x6c 0x6c 0x6f (contains a body of "Hello",
     but the contents of the body are arbitrary)

  *  0x8a 0x85 0x37 0xfa 0x21 0x3d 0x7f 0x9f 0x4d 0x51 0x58
     (contains a body of "Hello", matching the body of the ping)

o 单个未屏蔽帧中的 256 字节二进制消息

  *  0x82 0x7E 0x0100 [256 bytes of binary data]

o 单个未屏蔽帧中的 64KiB 二进制消息

  *  0x82 0x7F 0x0000000000010000 [65536 bytes of binary data]

所以看起来这是一个片段的例子。

这似乎也相关

6.2. 接收数据

要接收 WebSocket 数据,端点会监听底层
网络连接。传入数据必须被解析为第 5.2 节中定义的 WebSocket 帧。如果
接收到控制帧(第 5.5 节),则必须按照第 5.5 节的定义处理该帧。在
接收到数据帧(第 5.6 节)后,端点必须注意
数据的 /type/ 由
第 5.2 节中的操作码(帧操作码)定义。此帧中的“应用程序数据”被定义为
消息的 /data/。 如果帧包含未分段的
消息(第 5.4 节),则称已 接收到类型为 /type/ 和数据 /data/的 WebSocket 消息。
如果框架是
一个分段的消息,后续数据帧的“应用程序数据”
被连接起来形成/data/。当 FIN 位 (frame-fin) 指示接收到最后一个分片时,表示 已收到带有数据 /data/

WebSocket 消息(由 分片的“应用程序数据”的串联组成)和类型/type/ (从碎片消息的第一帧注意到)。
随后的数据帧必须被解释为属于新的
WebSocket 消息。

扩展(第 9 节)可能会改变数据读取方式的语义,特别是包含消息边界的内容。 扩展,除了在有效载荷中
的“应用程序数据”之前添加“扩展数据”之外,还可以修改“应用程序 数据”(例如通过压缩它)。

问题:

我不知道如何检查片段并将它们与节点缓冲区对齐,如文章中所述,我只能读取非常小的缓冲区量。

如何使用 RFC 文档中提到的分段方法和文章中提到(但未解释)的 nodeJS 缓冲区排列来解析更大的数据块

标签: node.jssocketswebsocketbufferrfc

解决方案


当我在自己的“纯 NodeJs WebSocket 服务器”上工作时,我遇到了你的问题。对于小于 1-2 KiB 的有效载荷,所有这些都可以正常工作。当我尝试发送更多,但仍在 [64 KiB - 1] 限制(16 位有效负载长度)内时,它会随机炸毁服务器并出现 ERR_OUT_OF_RANGE 错误。

旁注: https : //medium.com/hackernoon/implementing-a-websocket-server-with-node-js-d9b78ec5ffa8 Srushtika Neelakantam 的“使用 Node.js 实现 WebSocket 服务器”是一篇很棒的文章!在我发现它之前,WebSocket 对我来说一直是一个黑盒子。她从头开始描述了 WebSocket 客户端/服务器的非常简单易懂的实现。不幸的是,它缺乏(故意不使文章变得困难)对更大的有效负载和缓冲区对齐的支持。我只是想赞扬 Srushtika Neelakantam,因为没有她的文章,我永远不会编写自己的纯 NodeJs WebSocket 服务器。

文章中描述的解决方案失败只是因为 NodeJs 缓冲区刚刚结束并且没有更多字节要读取,但函数的逻辑需要更多字节。您以 ERR_OUT_OF_RANGE 错误结束。代码只是想读取尚不可用但将在下一个“数据”事件中可用的字节。

这个问题的解决方案是简单地检查你想从缓冲区读取的下一个字节是否真的可用。只要有字节就可以了。当字节数减少或字节数增加时,挑战就开始了。为了更灵活,解析缓冲区的函数不仅应该返回有效负载,还应该返回一对:有效负载和 bufferRemainingBytes。它将允许在主数据事件处理程序中连接缓冲区。

我们需要处理三种情况:

  1. 当缓冲区中有正确数量的字节来构建有效的 WebSocket 帧时,我们返回 { payload: payloadFromValidWebSocketFrame, bufferRemainingBytes: Buffer.alloc(0) }

  2. 当有足够的字节来构建有效的 WebSocket 但缓冲区中仍然剩下很少时,我们返回 { payload: payloadFromValidWebSocketFrame, bufferRemainingBytes: bufferBytesAfterValidWebSocketFrame }

    这种情况还迫使我们用 do-while 循环包装所有 getParsedBuffer 调用。bufferRemainingBytes 仍可能包含第二个(或第三个或更多)有效的 WebSocket 帧。我们需要在当前处理的套接字数据事件中解析它们。

  3. 当没有足够的字节来构建有效的 WebSocket 帧时,我们返回空的有效负载和整个缓冲区作为 bufferRemainingBytes { payload: null, bufferRemainingBytes: buffer }

如何在后续的套接字数据事件中将缓冲区与 bufferRemainingBytes 合并在一起?这是代码:

server.on('upgrade', (req, socket) => {
  let bufferToParse = Buffer.alloc(0); // at the beginning we just start with 0 bytes

  // .........

  socket.on('data', buffer => {
    let parsedBuffer;

    // concat 'past' bytes with the 'current' bytes
    bufferToParse = Buffer.concat([bufferToParse, buffer]);

    do {
      parsedBuffer = getParsedBuffer(bufferToParse);

      // the output of the debugBuffer calls will be on the screenshot later
      debugBuffer('buffer', buffer);
      debugBuffer('bufferToParse', bufferToParse);
      debugBuffer('parsedBuffer.payload', parsedBuffer.payload);
      debugBuffer('parsedBuffer.bufferRemainingBytes', parsedBuffer.bufferRemainingBytes);

      bufferToParse = parsedBuffer.bufferRemainingBytes;

      if (parsedBuffer.payload) {
        // .........
        // handle the payload as you like, for example send to other sockets
      }
    } while (parsedBuffer.payload && parsedBuffer.bufferRemainingBytes.length);

    console.log('----------------------------------------------------------------\n');
  });

  // .........
});

这是我的 getParsedBuffer 函数的样子(在文章中它被称为 parseMessage):

const getParsedBuffer = buffer => {
  // .........
  
  // whenever I want to read X bytes I simply check if I really can read X bytes
  if (currentOffset + 2 > buffer.length) {
    return { payload: null, bufferRemainingBytes: buffer };
  }
  payloadLength = buffer.readUInt16BE(currentOffset);
  currentOffset += 2;
  
  // .........
  
  // in 99% of cases this will prevent the ERR_OUT_OF_RANGE error to happen
  if (currentOffset + payloadLength > buffer.length) {
    console.log('[misalignment between WebSocket frame and NodeJs Buffer]\n');
    return { payload: null, bufferRemainingBytes: buffer };
  }

  payload = Buffer.alloc(payloadLength);

  if (isMasked) {
    // ......... I skip masked code as it's too long and not masked shows the idea same way
  } else {
    for (let i = 0; i < payloadLength; i++) {
      payload.writeUInt8(buffer.readUInt8(currentOffset++), i);
    }
  }

  // it could also happen at this point that we already have a valid WebSocket payload
  // but there are still some bytes remaining in the buffer
  // we need to copy all unused bytes and return them as bufferRemainingBytes
  bufferRemainingBytes = Buffer.alloc(buffer.length - currentOffset);
  //                                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ this value could be >= 0
  for (let i = 0; i < bufferRemainingBytes.length; i++) {
    bufferRemainingBytes.writeUInt8(buffer.readUInt8(currentOffset++), i);
  }

  return { payload, bufferRemainingBytes };
}

所述解决方案的实际测试(64 KiB - 1 个字节):

服务器日志


简而言之 - 上述解决方案应该适用于高达 [64 KiB - 1] 字节的有效负载。它完全用纯 NodeJ 编写,没有任何外部库。我想这就是你在项目中寻找的东西;)

请在下面找到我在 GitHub gist 上完整版二进制广播应用程序的链接:

有一段时间(在我部署具有更多功能的更新应用程序之前)可以在此处找到上述要点的现场演示:

http://sndu.pl - 让我们把文件发给你


推荐阅读