首页 > 解决方案 > 在异步 Rust (Tokio) 中包装阻塞 mpsc

问题描述

我正在尝试使用 Tokio包装一个同步 MQTT 客户端库。代码需要通过 std::sync::mpsc通道不断接收消息并将它们发送到异步代码中。我了解如何spawn_blocking用于包装返回单个值的代码。但是如何应用它来包装一个不断从std::sync::mpsc通道接收消息的循环呢?

这是我用来向频道发送消息的代码。

let (mut tx, mut rx) = std::sync::mpsc::channel();

tokio::spawn(async move {
            let mut mqtt_options = MqttOptions::new("bot", settings.mqtt.host, settings.mqtt.port);
            let (mut mqtt_client, notifications) = MqttClient::start(mqtt_options).unwrap();

            mqtt_client.subscribe(settings.mqtt.topic_name, QoS::AtLeastOnce).unwrap();

            tokio::task::spawn_blocking(move || {
                println!("Waiting for notifications");
                for notification in notifications {
                    match notification {
                        rumqtt::Notification::Publish(publish) => {
                            let payload = Arc::try_unwrap(publish.payload).unwrap();
                            let text: String = String::from_utf8(payload).expect("Can't decode payload for notification");
                            println!("Recieved message: {}", text);
                            let msg: Message = serde_json::from_str(&text).expect("Error while deserializing message");
                            println!("Deserialized message: {:?}", msg);
                            println!("{}", msg);
                            tx.send(msg);
                        }
                        _ => println!("{:?}", notification)
                    }
                }
            });
    });

但我不确定我应该如何使用 tokio API 在另一个异步闭包中接收这些消息。

tokio::task::spawn(async move || {
    // How to revieve messages via `rx` here? I can't use tokio::sync::mpsc channels 
    // since the code that sends messages is blocking.
});

标签: rustrust-tokio

解决方案


我在rust-lang 社区上发布了一个单独的帖子,并在那里得到了答案。

std::sync::mpsc::channel可以换成tokio::sync::mpsc::unbounded_channel,它有一个非异步发送方法。它解决了这个问题。


推荐阅读