rust - 永久 tokio TCP 流(客户端)
问题描述
前言,您可以跳到下一节
所以我决定为我的新的相对较小的项目尝试 Rust,因为我喜欢它生成一个单一的可执行文件,该可执行文件很容易在我的基于 ARM 的目标上部署,并且在 RAM 和磁盘空间方面的资源相对较少。我以前没有使用 Rust 的经验,但是有很多其他语言的经验,到目前为止我有点失望。似乎对于许多 Rust 库,可能还有 Rust 本身,API 变化如此之快,以至于网上找到的 90% 的示例代码都无法与最新版本的库(如 等)进行编译tokio
。tokio-util
此外,文档通常具有误导性。例如,如果您用 Google 搜索它LinesCodec
会显示在tokio_io::codec::LinesCodec
、tokio::codec::LinesCodec
和tokio_codec::LinesCodec
tokio_util::codec::LinesCodec
,这最终似乎是今天使用的一个。类似的其他东西也有同样的困惑FramedRead
,它在某些版本中有一个and_then
和map
成员函数,但它们似乎在最新版本中不存在。最后,在 SO 上与 Rust 相关的问题和答案的数量远远少于我使用过的其他语言,这使得开始使用 Rust 变得更加困难。我在过去 2 天尝试做的事情在大多数编程语言中都相对容易解决,我相信在 Rust 中也必须有一个简单的解决方案,但到目前为止我还没有成功。
问题本身
我需要将 TCP 客户端连接到远程服务器,并在数据传入时无限期地逐行读取和处理数据。这需要异步完成,因为同一个进程也充当 HTTP 服务器,所以我使用tokio
.
据我了解,比较常见的方法是使用TcpStream
,将其分割为 RX/TX 部分,然后我尝试连接 a LinesCodec
(with FramedRead
),但我无法在不出现编译错误的情况下将所有这些连接在一起。
[dependencies]
futures = "*"
hyper = "*"
tokio = { version = "*", features = ["full"] }
tokio-util = "0.2.0"
tokio-modbus = { version = "*", features = ["tcp", "server", "tcp-server-unstable"], git = "https://github.com/slowtec/tokio-modbus" }
let stream = TcpStream::connect("172.16.100.10:1001").await.unwrap();
let transport = FramedRead::new(stream, LinesCodec::new()); // need to split?
/* ... what to do next to process incoming data line-by-line ...? */
到目前为止,我提出了这个解决方案,但不确定它有多好
tokio::spawn(async {
let connection = TcpStream::connect("172.16.100.10:1001").await.unwrap();
let mut reader = BufReader::new(connection);
loop {
let mut line = String::new();
reader.read_line(&mut line).await.unwrap();
println!("{}", line);
}
});
解决方案
带有 Cargo.toml 的简单应用程序,例如:
[dependencies]
tokio = { version = "0.3", features = ["full"] }
tokio-util = { version = "0.4", features = ["codec"] }
和 main.rs 之类的:
use tokio::net::{TcpListener, TcpStream };
use tokio_util::codec::{ Framed, LinesCodec };
use tokio::stream::StreamExt;
use std::error::Error;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let args: Vec<String> = std::env::args().collect();
if args[1] == "server"
{
let local_addr: String = format!("{}{}",":::",args[2]); // app <server | client> <port>
let listener = TcpListener::bind(&local_addr).await?;
while let Ok((socket, peer)) = listener.accept().await {
tokio::spawn(async move {
println!("Client Connected from: {}",peer.to_string());
let mut client = Framed::new(socket, LinesCodec::new_with_max_length(1024));
while let Some(Ok(line)) = client.next().await {
println!("{}", line);
}
});
}
}
else if args[1] == "client"
{
let port = args[2].parse::<u16>().unwrap(); // app client <port>
let saddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port);
let conn = TcpStream::connect(saddr).await?;
let mut server = Framed::new(conn, LinesCodec::new_with_max_length(1024));
while let Some(Ok(line)) = server.next().await {
println!("{}", line);
}
}
Ok({})
}
作为服务器运行:
cargo run server 8080
(in another shell) nc localhost 8080
作为客户端运行:
(in another shell) nc -l -p 8080
cargo run client 8080
推荐阅读
- sql - 使用用户定义的表类型从 excel 到 SQL 的 MVC 300K 行数据表 - 操作数类型冲突
- javascript - React Native 与 Animated 同步声音
- google-cloud-platform - 如何将角色绑定添加到特定的 memorystore 实例?
- r - 使用 plm 包时如何解决“系统在计算上是奇异的:倒数条件数 = 3.12737e-31”错误消息
- c# - Hololens v2 与 Android 智能手机之间的通信
- c++ - CMake:使用 MSVC 2017 到 CLion 的 Qt 项目
- regex - 字符串末尾的正则表达式否定匹配
- python - “zsh:未知文件属性:0”在命令行上传递 Python 元组
- python - 如何从pdf(概率密度函数)中分离两个分布?
- python - 按月划分 csv 文件结果