首页 > 解决方案 > 如何共享 TlsStream在经纱请求中

问题描述

我有一个warp微服务,它在每个请求中创建一个 TCP 连接并从中写入/读取,工作代码如下所示:

#[derive(Clone, Debug)]
pub struct Redis {
    pub host: String,
    pub user: Option<String>,
    pub pass: Option<String>,
    pub v46: bool,
    pub port: u16,
    pub tls: native_tls::TlsConnector,
}

#[tokio::main]
async fn main() {
    let redis: options::Redis = match options::new() {
        Ok(o) => o,
        Err(e) => {
            eprintln!("{}", e);
            process::exit(1);
        }
    };

    let args = warp::any().map(move || redis.clone());

    let state_route = warp::any()
        .and(args)
        .and_then(state_handler)
        .recover(handle_rejection);

    warp::serve(state_route).run((addr, port)).await;
}

state_route每个请求中,我进行 TCP 连接:

async fn state_handler(redis: options::Redis) -> Result<impl warp::Reply, warp::Rejection> {
    let conn = timeout(Duration::from_secs(3), TcpStream::connect(&redis.host))
        .await
        .map_err(|e| warp::reject::custom(RequestTimeout(e.to_string())))?
        .map_err(|e| warp::reject::custom(ServiceUnavailable(e.to_string())))?;

    let stream = TlsConnector::from(redis.tls.clone())
        .connect(&redis.host, conn)
        .await
        .map_err(|e| warp::reject::custom(ServiceUnavailable(e.to_string())))?;

    let mut buf = BufStream::new(stream);
    ...
} 

这按预期工作,但我想main在处理程序中连接并共享连接,以防止每个请求都有多个连接。

我正在尝试这样的Arc事情:

async fn try_main() -> Result<()> {
    let redis: options::Redis = options::new()?;

    // TCP connect
    let conn = get_connection(&redis.host).await?;

    // TLS
    let stream = TlsConnector::from(redis.tls.clone())
        .connect(&redis.host, conn)
        .await?;

    let client = Arc::new(stream);

    let args = warp::any().map(move || client.clone());

    let state_route = warp::any()
        .and(args)
        .and_then(state_handler)
        .recover(handle_rejection);

    warp::serve(state_route).run((addr, port)).await;
    Ok(())
}

但我不知道如何共享与处理程序的连接,这就是我正在尝试的:

async fn state_handler(stream: Arc<TlsStream<TcpStream>>) -> Result<impl warp::Reply, warp::Rejection> {
    let data = Arc::clone(&stream);
    println!("{:#?}", data); // TlsStream... as expected
    let mut buf = BufStream::new(data);
    ...

我得到的错误是在尝试使用时BufStream,我得到:

trait bound `Arc<tokio_native_tls::TlsStream<tokio::net::TcpStream>>: AsyncRead` is not satisfied
   |
89 |     let mut buf = BufStream::new(data);
   |                                  ^^^^ the trait `AsyncWrite` is not implemented for `Arc<tokio_native_tls::TlsStream<tokio::net::TcpStream>>`
   |
   = note: required by `BufStream::<RW>::new`

有人建议我使用频道,例如:

let (tx, rx) = mpsc::unbounded_channel();

但我不知道如何在warp处理程序中实现这一点,因为每个请求我都需要写入/读取套接字。

关于如何实现这一点的任何想法?

标签: rusttcpasync-awaitrust-tokio

解决方案


推荐阅读