rust - 在异步 Rust (Tokio) 中包装阻塞 mpsc
问题描述
我正在尝试使用 Tokio包装一个同步 MQTT 客户端库。代码需要通过 std::sync::mpsc
通道不断接收消息并将它们发送到异步代码中。我了解如何spawn_blocking
用于包装返回单个值的代码。但是如何应用它来包装一个不断从std::sync::mpsc
通道接收消息的循环呢?
这是我用来向频道发送消息的代码。
let (mut tx, mut rx) = std::sync::mpsc::channel();
tokio::spawn(async move {
let mut mqtt_options = MqttOptions::new("bot", settings.mqtt.host, settings.mqtt.port);
let (mut mqtt_client, notifications) = MqttClient::start(mqtt_options).unwrap();
mqtt_client.subscribe(settings.mqtt.topic_name, QoS::AtLeastOnce).unwrap();
tokio::task::spawn_blocking(move || {
println!("Waiting for notifications");
for notification in notifications {
match notification {
rumqtt::Notification::Publish(publish) => {
let payload = Arc::try_unwrap(publish.payload).unwrap();
let text: String = String::from_utf8(payload).expect("Can't decode payload for notification");
println!("Recieved message: {}", text);
let msg: Message = serde_json::from_str(&text).expect("Error while deserializing message");
println!("Deserialized message: {:?}", msg);
println!("{}", msg);
tx.send(msg);
}
_ => println!("{:?}", notification)
}
}
});
});
但我不确定我应该如何使用 tokio API 在另一个异步闭包中接收这些消息。
tokio::task::spawn(async move || {
// How to revieve messages via `rx` here? I can't use tokio::sync::mpsc channels
// since the code that sends messages is blocking.
});
解决方案
我在rust-lang 社区上发布了一个单独的帖子,并在那里得到了答案。
std::sync::mpsc::channel
可以换成tokio::sync::mpsc::unbounded_channel
,它有一个非异步发送方法。它解决了这个问题。
推荐阅读
- ruby-on-rails - Ruby Rails Excel 格式数组操作
- angular - 将所有工件移至 S3 根目录
- tfs - 构建定义如何在构建过程中取消搁置给定的搁置集
- http - Postman 如何通过 HTTP 发送文件?
- java - ModelMapper 和 LocalDate - Spring Boot
- api - 将搜索框添加到多边形绘图谷歌地图
- python - 如何让自适应 dask 工作人员在启动时运行一些代码?
- c# - DisplayAlert 在 WPF 中没有响应 - Xamarin Forms
- .net - 使用动态列保护动态 SQL Where 子句
- batch-file - 与输出的空白差异