首页 > 解决方案 > 通过将 FnMut 包装成 Weak 将其发送到其他线程> 抱怨缺少发送

问题描述

我正在尝试用事件和回调编写一个简单的观察者模式。这个想法是,当拥有变异函数的事物死亡 ( struct Obj {cb: Arc<Mutex<FnMut>>, .. }) 时,它会丢弃Arc. 之后,任何Weak引用都将(有效地)失效,并且可以在访问时检测到。我也希望这能将FnMutfromThread'static生命周期解耦。我可能需要将整个东西包装在另一个Arc<Rwlock<T>>中,以防止Weak引用做线程不安全的事情,但这是一个不同的问题。

我得到的最接近的解决方案是:如何将函数发送到另一个线程?

但是,我似乎拥有Arc<Mutex<T>>并增加了生命周期(尽管我可能做错了)似乎没有帮助。我有点迷茫到底什么是错的。

我写了一个最小的例子:

use std::{
    collections::HashMap,
    sync::{
        mpsc::{self, Receiver},
        Arc, Mutex, Weak,
    },
    thread::{self, JoinHandle},
};

struct Callback {
    cb_w: Weak<Mutex<FnMut()>>,
}

type FnMapping = Arc<HashMap<String, Callback>>;

fn start_thread<'a>(rx: Receiver<String>, mapping: FnMapping) -> JoinHandle<()> {
    thread::spawn(move || {
        match rx.recv() {
            Ok(v) => {
                if let Some(o) = mapping.get(&v) {
                    match o.cb_w.upgrade() {
                        Some(cb_m) => match cb_m.lock() {
                            Ok(cb_lg) => (*cb_lg)(),
                            Err(e) => (),
                        },
                        None => { /* TODO owner is gone, mark for delete */ }
                    }
                }
            }
            Err(e) => (),
        }
    })
}

fn main() {
    let mapping: FnMapping = Arc::new(HashMap::new());
    let (tx, rx) = mpsc::channel();
    drop(tx);
    start_thread(rx, mapping)
        .join()
        .expect("Could not join thread -- failed to terminate?");
    println!("Leaving the test bay.");
}

这无法编译并出现以下错误:

error[E0277]: `(dyn std::ops::FnMut() + 'static)` cannot be sent between threads safely
  --> src/main.rs:17:5
   |
17 |     thread::spawn(move || {
   |     ^^^^^^^^^^^^^ `(dyn std::ops::FnMut() + 'static)` cannot be sent between threads safely
   |
   = help: the trait `std::marker::Send` is not implemented for `(dyn std::ops::FnMut() + 'static)`
   = note: required because of the requirements on the impl of `std::marker::Send` for `std::sync::Mutex<(dyn std::ops::FnMut() + 'static)>`
   = note: required because of the requirements on the impl of `std::marker::Send` for `std::sync::Weak<std::sync::Mutex<(dyn std::ops::FnMut() + 'static)>>`
   = note: required because it appears within the type `Callback`
   = note: required because it appears within the type `(std::string::String, Callback)`
   = note: required because it appears within the type `std::marker::PhantomData<(std::string::String, Callback)>`
   = note: required because it appears within the type `std::collections::hash::table::RawTable<std::string::String, Callback>`
   = note: required because it appears within the type `std::collections::HashMap<std::string::String, Callback>`
   = note: required because of the requirements on the impl of `std::marker::Send` for `std::sync::Arc<std::collections::HashMap<std::string::String, Callback>>`
   = note: required because it appears within the type `[closure@src/main.rs:17:19: 32:6 rx:std::sync::mpsc::Receiver<std::string::String>, mapping:std::sync::Arc<std::collections::HashMap<std::string::String, Callback>>]`
   = note: required by `std::thread::spawn`

标签: multithreadingrust

解决方案


推荐阅读