rust - 如何共享 TlsStream在经纱请求中
问题描述
我有一个warp微服务,它在每个请求中创建一个 TCP 连接并从中写入/读取,工作代码如下所示:
#[derive(Clone, Debug)]
pub struct Redis {
pub host: String,
pub user: Option<String>,
pub pass: Option<String>,
pub v46: bool,
pub port: u16,
pub tls: native_tls::TlsConnector,
}
#[tokio::main]
async fn main() {
let redis: options::Redis = match options::new() {
Ok(o) => o,
Err(e) => {
eprintln!("{}", e);
process::exit(1);
}
};
let args = warp::any().map(move || redis.clone());
let state_route = warp::any()
.and(args)
.and_then(state_handler)
.recover(handle_rejection);
warp::serve(state_route).run((addr, port)).await;
}
在state_route
每个请求中,我进行 TCP 连接:
async fn state_handler(redis: options::Redis) -> Result<impl warp::Reply, warp::Rejection> {
let conn = timeout(Duration::from_secs(3), TcpStream::connect(&redis.host))
.await
.map_err(|e| warp::reject::custom(RequestTimeout(e.to_string())))?
.map_err(|e| warp::reject::custom(ServiceUnavailable(e.to_string())))?;
let stream = TlsConnector::from(redis.tls.clone())
.connect(&redis.host, conn)
.await
.map_err(|e| warp::reject::custom(ServiceUnavailable(e.to_string())))?;
let mut buf = BufStream::new(stream);
...
}
这按预期工作,但我想main
在处理程序中连接并共享连接,以防止每个请求都有多个连接。
我正在尝试这样的Arc
事情:
async fn try_main() -> Result<()> {
let redis: options::Redis = options::new()?;
// TCP connect
let conn = get_connection(&redis.host).await?;
// TLS
let stream = TlsConnector::from(redis.tls.clone())
.connect(&redis.host, conn)
.await?;
let client = Arc::new(stream);
let args = warp::any().map(move || client.clone());
let state_route = warp::any()
.and(args)
.and_then(state_handler)
.recover(handle_rejection);
warp::serve(state_route).run((addr, port)).await;
Ok(())
}
但我不知道如何共享与处理程序的连接,这就是我正在尝试的:
async fn state_handler(stream: Arc<TlsStream<TcpStream>>) -> Result<impl warp::Reply, warp::Rejection> {
let data = Arc::clone(&stream);
println!("{:#?}", data); // TlsStream... as expected
let mut buf = BufStream::new(data);
...
我得到的错误是在尝试使用时BufStream
,我得到:
trait bound `Arc<tokio_native_tls::TlsStream<tokio::net::TcpStream>>: AsyncRead` is not satisfied
|
89 | let mut buf = BufStream::new(data);
| ^^^^ the trait `AsyncWrite` is not implemented for `Arc<tokio_native_tls::TlsStream<tokio::net::TcpStream>>`
|
= note: required by `BufStream::<RW>::new`
有人建议我使用频道,例如:
let (tx, rx) = mpsc::unbounded_channel();
但我不知道如何在warp
处理程序中实现这一点,因为每个请求我都需要写入/读取套接字。
关于如何实现这一点的任何想法?
解决方案
推荐阅读
- plsql - 事务管理相关问题
- javascript - 将 Vtt 文件从 express 服务器发送到客户端
- flutter - 通过插入视频链接从 tiktok 下载视频。颤振/飞镖
- eclipse-cdt - eclipse上CppStyle插件的问题
- javascript - 我正在尝试安装 Material Top Tabs Navigator React Native 6.xx 版,但出现错误
- html - 当页面包含溢出表格时,HTML 电子邮件的视口缩放不起作用
- reactjs - React JS:使用 setInterval() 触发函数以每秒执行一次
- r - 获取虚拟变量的百分比
- javascript - webpack:没有 css 模块的 css 加载器 LocalIdentName
- groovy - 无法使用 groovy 从 SOAP UI 获取响应值