rust - 使用 Tokio 启动多个线程
问题描述
我正在尝试创建一个基本的 tcp 服务器:
- 服务器应该能够向所有连接的客户端广播消息流
- 服务器应该能够接收来自所有客户端的命令并处理它们
这就是我的main
功能:
let (server_tx, server_rx) = mpsc::unbounded();
let state = Arc::new(Mutex::new(Shared::new(server_tx)));
let addr = "127.0.0.1:6142".parse().unwrap();
let listener = TcpListener::bind(&addr).unwrap();
let server = listener.incoming().for_each(move |socket| {
// Spawn a task to process the connection
process(socket, state.clone());
Ok(())
}).map_err(|err| {
println!("accept error = {:?}", err);
});
println!("server running on localhost:6142");
let _messages = server_rx.for_each(|_| {
// process messages here
Ok(())
}).map_err(|err| {
println!("message error = {:?}", err);
});
tokio::run(server);
(操场)
我使用chat.rs
来自 tokio 存储库的示例作为基础。
我正在向server_tx
传入的 tcp 消息发送数据。
我遇到的麻烦是消费它们。
我正在使用“消费”传入消息流server_rx.for_each(|_| {
,现在,我如何告诉 tokio 运行它?
tokio::run
接受一个未来,但我有 2 个(可能更多)。我如何组合它们以使它们并行运行?
解决方案
一起加入期货:
let messages = server_rx.for_each(|_| {
println!("Message broadcasted");
Ok(())
}).map_err(|err| {
println!("accept error = {:?}", err);
});
tokio::run(server.join(messages).map(|_| ()));
需要组合器map()
,因为关联类型是一个元组并
使用需要一个类型的未来任务Join
Item
((), ())
tokio::run()
Future::Item
()
推荐阅读
- azure - 如何通过 Azure AD API 禁用教职员工许可证中的特定应用程序?
- c# - 如何在 iOS 本机代码或 Unity C# 中知道电话设备当前是否正在通话?
- python-3.x - TypeError:需要一个类似字节的对象,而不是 Geniatagger 的“str”?
- react-native - 有没有办法在另一个网络上运行(预览)博览会项目?
- reactjs - 将组件状态传递给兄弟
- intellij-idea - 如何通过相对路径或使用环境变量在 Idea http 编辑器中处理文件名?
- scala - countWindow 和 timeWindow 如何连接?
- php - 索引为 google.com 的网站和所有页面均被排除在外
- .htaccess - 从模块博客到信息页面的 Opencart URL 重定向
- mysql - 如何在日期时间中减去 1 天?