首页 > 解决方案 > 永久 tokio TCP 流(客户端)

问题描述

前言,您可以跳到下一节

所以我决定为我的新的相对较小的项目尝试 Rust,因为我喜欢它生成一个单一的可执行文件,该可执行文件很容易在我的基于 ARM 的目标上部署,并且在 RAM 和磁盘空间方面的资源相对较少。我以前没有使用 Rust 的经验,但是有很多其他语言的经验,到目前为止我有点失望。似乎对于许多 Rust 库,可能还有 Rust 本身,API 变化如此之快,以至于网上找到的 90% 的示例代码都无法与最新版本的库(如 等)进行编译tokiotokio-util此外,文档通常具有误导性。例如,如果您用 Google 搜索它LinesCodec会显示在tokio_io::codec::LinesCodectokio::codec::LinesCodectokio_codec::LinesCodectokio_util::codec::LinesCodec,这最终似乎是今天使用的一个。类似的其他东西也有同样的困惑FramedRead,它在某些版本中有一个and_thenmap成员函数,但它们似乎在最新版本中不存在。最后,在 SO 上与 Rust 相关的问题和答案的数量远远少于我使用过的其他语言,这使得开始使用 Rust 变得更加困难。我在过去 2 天尝试做的事情在大多数编程语言中都相对容易解决,我相信在 Rust 中也必须有一个简单的解决方案,但到目前为止我还没有成功。

问题本身

我需要将 TCP 客户端连接到远程服务器,并在数据传入时无限期地逐行读取和处理数据。这需要异步完成,因为同一个进程也充当 HTTP 服务器,所以我使用tokio.

据我了解,比较常见的方法是使用TcpStream,将其分割为 RX/TX 部分,然后我尝试连接 a LinesCodec(with FramedRead),但我无法在不出现编译错误的情况下将所有这些连接在一起。

[dependencies]
futures = "*"
hyper = "*"
tokio = { version = "*", features = ["full"] }
tokio-util = "0.2.0"
tokio-modbus = { version = "*", features = ["tcp", "server", "tcp-server-unstable"], git = "https://github.com/slowtec/tokio-modbus" }
let stream = TcpStream::connect("172.16.100.10:1001").await.unwrap();
let transport = FramedRead::new(stream, LinesCodec::new()); // need to split?
/* ... what to do next to process incoming data line-by-line ...? */

到目前为止,我提出了这个解决方案,但不确定它有多好

tokio::spawn(async {
    let connection = TcpStream::connect("172.16.100.10:1001").await.unwrap();
    let mut reader = BufReader::new(connection);

    loop {
        let mut line = String::new();
        reader.read_line(&mut line).await.unwrap();
        println!("{}", line);
    }
});

标签: rustrust-tokio

解决方案


带有 Cargo.toml 的简单应用程序,例如:

[dependencies]
tokio = { version = "0.3", features = ["full"] }
tokio-util = { version = "0.4", features = ["codec"] }

和 main.rs 之类的:

use tokio::net::{TcpListener, TcpStream };
use tokio_util::codec::{ Framed, LinesCodec };
use tokio::stream::StreamExt;
use std::error::Error;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let args: Vec<String> = std::env::args().collect();

    if args[1] == "server"
    {
        let local_addr: String = format!("{}{}",":::",args[2]); // app <server | client> <port>

        let listener = TcpListener::bind(&local_addr).await?;

        while let Ok((socket, peer)) = listener.accept().await {

            tokio::spawn(async move {
                println!("Client Connected from: {}",peer.to_string());
                let mut client = Framed::new(socket, LinesCodec::new_with_max_length(1024));
        
                while let Some(Ok(line)) = client.next().await {
                    println!("{}", line);
                }
            });
        }
    }
    else if args[1] == "client"
    {
        let port = args[2].parse::<u16>().unwrap(); // app client <port>
        let saddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port); 
        let conn = TcpStream::connect(saddr).await?;

        let mut server = Framed::new(conn, LinesCodec::new_with_max_length(1024));

        while let Some(Ok(line)) = server.next().await {
            println!("{}", line);
        }
    }

    Ok({})
}

作为服务器运行:

cargo run server 8080
(in another shell) nc localhost 8080

作为客户端运行:

(in another shell) nc -l -p 8080
cargo run client 8080

推荐阅读