首页 > 解决方案 > 使用流处理接收到的数据

问题描述

我正在从套接字接收消息。套接字封装在一个标头(基本上是消息的大小)和一个作为 crc 的页脚(一种检查消息是否未损坏的代码)中

所以,布局是这样的:

size (2 bytes) | message (240 bytes) | crc (4 byte)

我写了一个operator>>

operator>>如下:

std::istream &operator>>(std::istream &stream, Message &msg) {
    std::int16_t size;
    stream >> size;
    stream.read(reinterpret_cast<char*>(&msg), size);

    // Not receive enough data
    if (stream.rdbuf()->in_avail() < dataSize + 4) {
        stream.setstate(std::ios_base::failbit);
        return stream;
    }

    std::int16_t gotCrc;
    stream >> gotCrc;

    // Data not received correctly
    if(gotCrc != computeCrc(msg)) {
        stream.setstate(std::ios_base::failbit);
    }
    return stream;
}

消息可以逐字节到达,也可以完全到达。我们甚至可以一次收到多条消息。

基本上,我所做的是这样的:

struct MessageReceiver {
    std::string totalDataReceived;
    void messageArrived(std::string data) {
        // We add the data to totaldataReceived
        totalDataReceived += data;

        std::stringbuf buf(totalDataReceived);
        std::istream stream(&buf);
        std::vector<Message> messages(
            std::istream_iterator<Message>(stream), 
            std::istream_iterator<Message>{});
        std::for_each(begin(messages), end(messages), processMessage);
        // +4 for crc and + 2 for the size to remove
        auto sizeToRemove = [](auto init, auto message) {return init + message.size + 4 + 2;};
        // remove the proceed messages
        totalDataReceived.remove(accumulate(begin(messages), end(messages), 0, sizeToRemove);
    }
};

所以基本上,我们接收数据,我们将它插入到接收到的数据的总数组中。我们流式传输它,如果我们至少收到一条消息,我们将其从缓冲区中删除totalDataReceived

但是,我不确定这是否是好方法。实际上,当计算错误的 crc 时,此代码不起作用...(未创建消息,因此我们不会对其进行迭代)。所以每次,我都会尝试阅读带有错误 crc 的消息......

我怎样才能做到这一点?我无法保留所有数据,totalDataReceived因为我可以在执行期间收到很多消息。

我应该实现自己的streambuf吗?

标签: c++streamistreamistream-iterator

解决方案


我发现你想要创建的是一个类似于 std::istream 的类。当然你可以选择创建你自己的类,但出于某些原因我更喜欢实现 std::streambuf。

首先,使用你的类的人习惯于使用它,因为如果你继承和实现 std::streambuf 和 std::istream,它的行为与 std::istream 相同。

其次,您不需要创建额外的方法或不需要覆盖运算符。它们已经在 std::istream 的类级别中准备好了。

实现 std::streambuf 所要做的就是继承它,覆盖 underflow() 并使用 setg() 设置 get 指针。


推荐阅读