rust - 流上的 tokio::timeout 总是发生
问题描述
我正在尝试接受 UDP 消息,但只有在 5 秒内发生时,我才能通过手动实现 Stream 和使用 futures 库中的组合器构建一个 Stream 抽象。每种方式,在recv_from
未来解决后,持续时间将到期并且流将返回一个Err(Elapsed(()))
. 这不是预期的行为,如果返回一个值,则不应返回任何错误。
预期的行为是流将解决超时或Vec,但不是一个,然后是另一个 5 秒后。
use futures::{pin_mut, ready, stream::unfold, FutureExt};
use tokio::{
net::{udp, UdpSocket},
stream::{Stream, StreamExt},
time::{self, Duration},
};
use std::{
io,
net::SocketAddr,
pin::Pin,
task::{Context, Poll},
};
#[derive(Debug)]
pub(crate) struct UdpStream {
stream: udp::RecvHalf,
}
impl UdpStream {
fn new(stream: udp::RecvHalf) -> Self {
Self { stream }
}
fn stream(self) -> impl Stream<Item = io::Result<(Vec<u8>, SocketAddr)>> {
unfold(self.stream, |mut stream| async move {
let mut buf = [0; 4096];
match time::timeout(Duration::from_secs(5), stream.recv_from(&mut buf)).await {
Ok(Ok((len, src))) => {
Some((Ok((buf.iter().take(len).cloned().collect(), src)), stream))
}
e => {
println!("{:?}", e);
None
}
}
})
}
}
impl Stream for UdpStream {
type Item = io::Result<(Vec<u8>, SocketAddr)>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let socket = &mut self.stream;
pin_mut!(socket);
let mut buf = [0u8; 4096];
let (len, src) = ready!(Box::pin(socket.recv_from(&mut buf)).poll_unpin(cx))?;
Poll::Ready(Some(Ok((buf.iter().take(len).cloned().collect(), src))))
}
}
async fn listen_udp(addr: SocketAddr) -> io::Result<()> {
let udp = UdpSocket::bind(addr).await?;
let (mut udp_recv, mut udp_send) = udp.split();
let mut msg_stream = Box::pin(UdpStream::new(udp_recv).stream());
// use the manually implemented stream with this:
// let mut msg_stream = UdpStream::new(udp_recv).timeout(Duration::from_secs(5));
while let Some(msg) = msg_stream.next().await {
match msg {
Ok((buf, src)) => {
udp_send.send_to(&buf, &src).await?;
println!("Message recv: {:?}", buf);
}
Err(e) => {
eprintln!("timed out: {:?}", e);
}
}
}
Ok(())
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
listen_udp("127.0.0.1:9953".parse()?).await?;
Ok(())
}
您可以尝试运行此代码并使用echo "foo" | nc 127.0.0.1 9953 -u
或使用dig
货物.toml
[package]
name = "udp_test"
version = "0.1.0"
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
tokio = { version = "0.2", features = ["full"] }
futures = "0.3"
解决方案
在您的流返回第一个(也是唯一的)元素后,它会返回等待下一个;它永远不会结束,直到超时。
基本上这里不需要流抽象。future
包裹在超时中会做:
use std::{io, net::SocketAddr};
use tokio::{
net::UdpSocket,
time::{self, Duration},
};
async fn listen_udp(addr: SocketAddr) -> io::Result<()> {
let mut udp = UdpSocket::bind(addr).await?;
let mut buf = [0; 4096];
match time::timeout(Duration::from_secs(5), udp.recv_from(&mut buf)).await? {
Ok((count, src)) => {
udp.send_to(&buf[..count], &src).await?;
println!("Message recv: {:?}", &buf[..count]);
}
Err(e) => {
eprintln!("timed out: {:?}", e);
}
}
Ok(())
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
listen_udp("127.0.0.1:9953".parse()?).await?;
Ok(())
}
推荐阅读
- node.js - Docker Node 映像在映像中可用时无法识别 node_modules 文件夹
- zip - 随机访问 ZIP 中的文件块
- assembly - MIPS:打印字符串的元音和辅音计数
- awk - 如何在第一行和最后一行末尾添加双引号
- python - 如何在 pyspark 列上使用“LanguageDetectorDL”火花 NLP?
- mysql - MySQL utf8_unicode_ci 将不同的字符视为相同
- javascript - 如何处理元素的onclick探针中的3倍引号
- typescript - 映射深层对象和可选参数
- powershell - Powershell - 如何在powershell中编写会话刷新脚本?
- node.js - 嵌套数组中的猫鼬唯一文档