首页 > 解决方案 > 为什么异步 TcpStream 阻塞?

问题描述

我正在开发一个在 rust 中实现分布式键值存储的项目。我使用 Tokio 的异步运行时制作了服务器端代码。我遇到了一个问题,我的异步代码似乎被阻塞了,所以当我与服务器有多个连接时,只有一个 TcpStream 被处理。我是实现async代码的新手,无论是在一般情况下还是在 rust 上,但我认为如果给定的 tcp 流上没有活动,其他流将被接受和处理。

我对 async 的理解是错误的还是我错误地使用了 tokio?

这是我的切入点:

use std::error::Error;
use std::net::SocketAddr;
use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::sync::{Arc, Mutex};

use env_logger;
use log::{debug, info};
use structopt::StructOpt;
use tokio::net::TcpListener;

extern crate blue;

use blue::ipc::message;
use blue::store::args;
use blue::store::cluster::{Cluster, NodeRole};
use blue::store::deserialize::deserialize_store;
use blue::store::handler::handle_stream;
use blue::store::wal::WriteAheadLog;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();

    let opt = args::Opt::from_args();
    let addr = SocketAddr::from_str(format!("{}:{}", opt.host, opt.port).as_str())?;
    let role = NodeRole::from_str(opt.role.as_str()).unwrap();
    let leader_addr = match role {
        NodeRole::Leader => addr,
        NodeRole::Follower => SocketAddr::from_str(opt.follow.unwrap().as_str())?,
    };

    let wal_name = addr.to_string().replace(".", "").replace(":", "");
    let wal_full_name = format!("wal{}.log", wal_name);
    let wal_path = PathBuf::from(wal_full_name);
    let mut wal = match wal_path.exists() {
        true => {
            info!("Existing WAL found");
            WriteAheadLog::open(&wal_path)?
        }
        false => {
            info!("Creating WAL");
            WriteAheadLog::new(&wal_path)?
        }
    };
    debug!("WAL: {:?}", wal);

    let store_name = addr.to_string().replace(".", "").replace(":", "");
    let store_pth = format!("{}.pb", store_name);
    let store_path = Path::new(&store_pth);
    let mut store = match store_path.exists() {
        true => deserialize_store(store_path)?,
        false => message::Store::default(),
    };

    let listener = TcpListener::bind(addr).await?;
    let cluster = Cluster::new(addr, &role, leader_addr, &mut wal, &mut store).await?;

    let store_path = Arc::new(store_path);
    let store = Arc::new(Mutex::new(store));

    let wal = Arc::new(Mutex::new(wal));
    let cluster = Arc::new(Mutex::new(cluster));
    info!("Blue launched. Waiting for incoming connection");

    loop {
        let (stream, addr) = listener.accept().await?;
        info!("Incoming request from {}", addr);
        let store = Arc::clone(&store);
        let store_path = Arc::clone(&store_path);
        let wal = Arc::clone(&wal);
        let cluster = Arc::clone(&cluster);
        handle_stream(stream, store, store_path, wal, cluster, &role).await?;
    }
}

下面是我的处理程序(handle_stream来自上面)。我排除了所有处理程序,match input因为我认为没有必要证明这一点(该部分的完整代码在这里:https ://github.com/matthewmturner/Bradfield-Distributed-Systems/blob/main/blue/ src/store/handler.rs如果它真的有帮助)。

具体来说,阻塞的点是线let input = async_read_message::<message::Request>(&mut stream).await;

这是服务器等待来自集群中的客户端或另一台服务器的通信的地方。我目前看到的行为是,在使用客户端连接到服务器后,服务器没有收到任何将其他节点添加到集群的请求——它只处理客户端流。

use std::io;
use std::net::{SocketAddr, TcpStream};
use std::path::Path;
use std::str::FromStr;
use std::sync::{Arc, Mutex};

use log::{debug, error, info};
use serde_json::json;
use tokio::io::AsyncWriteExt;
use tokio::net::TcpStream as asyncTcpStream;

use super::super::ipc::message;
use super::super::ipc::message::request::Command;
use super::super::ipc::receiver::async_read_message;
use super::super::ipc::sender::{async_send_message, send_message};
use super::cluster::{Cluster, NodeRole};
use super::serialize::persist_store;
use super::wal::WriteAheadLog;

// TODO: Why isnt async working? I.e. connecting servers after client is connected stays on client stream.
pub async fn handle_stream<'a>(
    mut stream: asyncTcpStream,
    store: Arc<Mutex<message::Store>>,
    store_path: Arc<&Path>,
    wal: Arc<Mutex<WriteAheadLog<'a>>>,
    cluster: Arc<Mutex<Cluster>>,
    role: &NodeRole,
) -> io::Result<()> {
    loop {
        info!("Handling stream: {:?}", stream);
        let input = async_read_message::<message::Request>(&mut stream).await;
        debug!("Input: {:?}", input);
        match input {
        ...
        }
    }
}

这是代码async_read_message

pub async fn async_read_message<M: Message + Default>(
    stream: &mut asyncTcpStream,
) -> io::Result<M> {
    let mut len_buf = [0u8; 4];
    debug!("Reading message length");
    stream.read_exact(&mut len_buf).await?;
    let len = i32::from_le_bytes(len_buf);
    let mut buf = vec![0u8; len as usize];
    debug!("Reading message");
    stream.read_exact(&mut buf).await?;
    let user_input = M::decode(&mut buf.as_slice())?;
    debug!("Received message: {:?}", user_input);
    Ok(user_input)
}

标签: asynchronousrustrust-tokio

解决方案


您的问题在于客户端连接后如何处理消息:

handle_stream(stream, store, store_path, wal, cluster, &role).await?;

.await意味着您的侦听循环将等待handle_stream返回,但是(做出一些假设)此函数在客户端断开连接之前不会返回。你想要的是tokio::spawn一个可以独立运行的新任务:

tokio::spawn(handle_stream(stream, store, store_path, wal, cluster, &role));

您可能必须更改某些参数类型以避免生命周期;tokio::spawn需要'static,因为任务的生命周期与生成它的范围解耦。


推荐阅读