asynchronous - 为什么异步 TcpStream 阻塞?
问题描述
我正在开发一个在 rust 中实现分布式键值存储的项目。我使用 Tokio 的异步运行时制作了服务器端代码。我遇到了一个问题,我的异步代码似乎被阻塞了,所以当我与服务器有多个连接时,只有一个 TcpStream 被处理。我是实现async
代码的新手,无论是在一般情况下还是在 rust 上,但我认为如果给定的 tcp 流上没有活动,其他流将被接受和处理。
我对 async 的理解是错误的还是我错误地使用了 tokio?
这是我的切入点:
use std::error::Error;
use std::net::SocketAddr;
use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::sync::{Arc, Mutex};
use env_logger;
use log::{debug, info};
use structopt::StructOpt;
use tokio::net::TcpListener;
extern crate blue;
use blue::ipc::message;
use blue::store::args;
use blue::store::cluster::{Cluster, NodeRole};
use blue::store::deserialize::deserialize_store;
use blue::store::handler::handle_stream;
use blue::store::wal::WriteAheadLog;
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();
let opt = args::Opt::from_args();
let addr = SocketAddr::from_str(format!("{}:{}", opt.host, opt.port).as_str())?;
let role = NodeRole::from_str(opt.role.as_str()).unwrap();
let leader_addr = match role {
NodeRole::Leader => addr,
NodeRole::Follower => SocketAddr::from_str(opt.follow.unwrap().as_str())?,
};
let wal_name = addr.to_string().replace(".", "").replace(":", "");
let wal_full_name = format!("wal{}.log", wal_name);
let wal_path = PathBuf::from(wal_full_name);
let mut wal = match wal_path.exists() {
true => {
info!("Existing WAL found");
WriteAheadLog::open(&wal_path)?
}
false => {
info!("Creating WAL");
WriteAheadLog::new(&wal_path)?
}
};
debug!("WAL: {:?}", wal);
let store_name = addr.to_string().replace(".", "").replace(":", "");
let store_pth = format!("{}.pb", store_name);
let store_path = Path::new(&store_pth);
let mut store = match store_path.exists() {
true => deserialize_store(store_path)?,
false => message::Store::default(),
};
let listener = TcpListener::bind(addr).await?;
let cluster = Cluster::new(addr, &role, leader_addr, &mut wal, &mut store).await?;
let store_path = Arc::new(store_path);
let store = Arc::new(Mutex::new(store));
let wal = Arc::new(Mutex::new(wal));
let cluster = Arc::new(Mutex::new(cluster));
info!("Blue launched. Waiting for incoming connection");
loop {
let (stream, addr) = listener.accept().await?;
info!("Incoming request from {}", addr);
let store = Arc::clone(&store);
let store_path = Arc::clone(&store_path);
let wal = Arc::clone(&wal);
let cluster = Arc::clone(&cluster);
handle_stream(stream, store, store_path, wal, cluster, &role).await?;
}
}
下面是我的处理程序(handle_stream
来自上面)。我排除了所有处理程序,match input
因为我认为没有必要证明这一点(该部分的完整代码在这里:https ://github.com/matthewmturner/Bradfield-Distributed-Systems/blob/main/blue/ src/store/handler.rs如果它真的有帮助)。
具体来说,阻塞的点是线let input = async_read_message::<message::Request>(&mut stream).await;
这是服务器等待来自集群中的客户端或另一台服务器的通信的地方。我目前看到的行为是,在使用客户端连接到服务器后,服务器没有收到任何将其他节点添加到集群的请求——它只处理客户端流。
use std::io;
use std::net::{SocketAddr, TcpStream};
use std::path::Path;
use std::str::FromStr;
use std::sync::{Arc, Mutex};
use log::{debug, error, info};
use serde_json::json;
use tokio::io::AsyncWriteExt;
use tokio::net::TcpStream as asyncTcpStream;
use super::super::ipc::message;
use super::super::ipc::message::request::Command;
use super::super::ipc::receiver::async_read_message;
use super::super::ipc::sender::{async_send_message, send_message};
use super::cluster::{Cluster, NodeRole};
use super::serialize::persist_store;
use super::wal::WriteAheadLog;
// TODO: Why isnt async working? I.e. connecting servers after client is connected stays on client stream.
pub async fn handle_stream<'a>(
mut stream: asyncTcpStream,
store: Arc<Mutex<message::Store>>,
store_path: Arc<&Path>,
wal: Arc<Mutex<WriteAheadLog<'a>>>,
cluster: Arc<Mutex<Cluster>>,
role: &NodeRole,
) -> io::Result<()> {
loop {
info!("Handling stream: {:?}", stream);
let input = async_read_message::<message::Request>(&mut stream).await;
debug!("Input: {:?}", input);
match input {
...
}
}
}
这是代码async_read_message
pub async fn async_read_message<M: Message + Default>(
stream: &mut asyncTcpStream,
) -> io::Result<M> {
let mut len_buf = [0u8; 4];
debug!("Reading message length");
stream.read_exact(&mut len_buf).await?;
let len = i32::from_le_bytes(len_buf);
let mut buf = vec![0u8; len as usize];
debug!("Reading message");
stream.read_exact(&mut buf).await?;
let user_input = M::decode(&mut buf.as_slice())?;
debug!("Received message: {:?}", user_input);
Ok(user_input)
}
解决方案
您的问题在于客户端连接后如何处理消息:
handle_stream(stream, store, store_path, wal, cluster, &role).await?;
这.await
意味着您的侦听循环将等待handle_stream
返回,但是(做出一些假设)此函数在客户端断开连接之前不会返回。你想要的是tokio::spawn
一个可以独立运行的新任务:
tokio::spawn(handle_stream(stream, store, store_path, wal, cluster, &role));
您可能必须更改某些参数类型以避免生命周期;tokio::spawn
需要'static
,因为任务的生命周期与生成它的范围解耦。
推荐阅读
- firemonkey - 如何在运行时更改 Listbox Itemdata.detail 字体大小 Delphi 10.3
- javascript - 如何将取消全选按钮添加到与脚本一起使用以更改类的已检查输入列表
- oracle-sqldeveloper - 无法启动 Oracle SQL Developer
- javascript - 配置 defaultNavigationOption headerBackImage 抛出未定义
- android - 如何通过 NFC 发送 json 结构化数据?
- java - Android:从 url 查看视频流
- logstash - LogStash 消息配置错误错误:无法执行操作
- azure - App Insight 警报 - 排除某些 http 错误或更改编码
- php - PDO 插入两条记录
- .net - 包含所有库时 .NET 3.0 代码中的返回代码错误