首页 > 解决方案 > 线程中使用的所有东西都必须在 Rust 中可以“发送”吗?

问题描述

我正在尝试在 Rust 中实现并发处理。这是(简化的)代码(游乐场):

struct Checker {
    unsendable: std::rc::Rc<String> // `Rc` does not implement Send + Sync (`Rc` just for simplicity, a graph actually)
}

impl Checker {
    pub fn check(&self, input: String) -> bool {
        // some logics (no matter)
        true
    }
}

struct ParallelChecker {
    allow_checker: Checker,
    block_checker: Checker
}

impl ParallelChecker {
    // `String` for simplicity here
    pub fn check(&self, input: String) -> bool {
        let thread_pool = rayon::ThreadPoolBuilder::new()
            .num_threads(2)
            .build()
            .unwrap();

        // scoped thread pool for simplicity (to avoid `self` is not `'static`)
        thread_pool.scope(move |s| {
            s.spawn(|_| {
                let result = self.allow_checker.check(input.clone());
                // + send result via channel
            });
        });

        thread_pool.scope(move |s| {
            s.spawn(|_| {
                let result = self.block_checker.check(input.clone());
                // + send result via channel
            });
        });

        true // for simplicity
        // + receive result form the channels
    }
}

问题是self.allow_checker并且self.block_checker不实施Send

55 |         thread_pool.scope(move |s| {
   |                     ^^^^^ `Rc<String>` cannot be sent between threads safely
   |
   = help: within `Checker`, the trait `Send` is not implemented for `Rc<String>`
   = note: required because it appears within the type `Checker`
   = note: required because of the requirements on the impl of `Send` for `Mutex<Checker>`
   = note: 1 redundant requirements hidden
   = note: required because of the requirements on the impl of `Send` for `Arc<Mutex<Checker>>`
   = note: required because it appears within the type `[closure@src/parallel_checker.rs:55:27: 60:10]`

我的印象是只有通过渠道发送的东西必须实现Send + Sync,我在这里可能错了。

如您所见,线程代码没有任何共享变量(除了self)。Send如何在不实施同步且不支付任何费用的情况下使其正常工作?

我试图同步访问(虽然它看起来没用,因为线程中没有共享变量),但没有运气:

    let allow_checker = Arc::new(Mutex::new(self.allow_checker));
        thread_pool.scope(move |s| {
            s.spawn(|_| {
                let result = allow_checker.lock().unwrap().check(input.clone());
                // + send result via channel
            });
        });

PS。由于性能问题以及要迁移到的大量相关代码,迁移到Arc是非常不受欢迎的Arc

标签: multithreadingrust

解决方案


我的印象是只有通过渠道发送的东西必须实现Send + Sync,我在这里可能错了。

有点错误:

  • 类型是Send其值是否可以跨线程发送。例如,许多类型是SendString是。
  • 一个类型是指Sync是否可以从多个线程访问对其值的引用而不会引起任何数据争用。String也许令人惊讶的是,这意味着Sync——由于共享时是不可变的——并且通常T适用Sync于任何&T情况Send

请注意,这些规则不关心值如何跨线程发送或共享,只关心它们。

这在这里很重要,因为用于启动线程的闭包本身是跨线程发送的:它在“启动器”线程上创建,并在“启动”线程上执行。

结果,这个闭包必须是Send,这意味着它捕获的任何东西都必须是Send

为什么不Rc执行Send

因为它的引用计数是非原子的。

也就是说,如果您有:

  • 线程 A:一Rc
  • 线程 B:一个Rc(相同的指针)。

然后线程 A 在线程 B 创建克隆的同时删除其句柄,您希望计数为 2(仍然),但由于非原子访问,尽管仍然存在 2 个句柄,但它可能只有 1:

  • 线程 A 读取计数 (2)。
  • 线程 B 读取计数 (2)。
  • 线程 B 写入递增计数 (3)。
  • 线程 A 写入递减计数 (1)。

然后,下一次 B 放下手柄时,项目被破坏,内存被释放,任何通过最后一个手柄进行的进一步访问都会在你的脸上炸开。

我试图同步访问(虽然它看起来没用,因为线程中没有共享变量),但没有运气:

你不能包装一个类型来制作它Send,它没有帮助,因为它不会改变基本属性。

Rc即使包裹RcArc<Mutex<...>>.

因此!Send具有传染性并“感染”任何包含类型。

由于性能问题以及要迁移到的大量相关代码,迁移到Arc是非常不受欢迎的Arc

ArcChecker本身的性能开销相对较小,因此除非您继续克隆那些您可能可以改进的东西,否则它似乎不太重要——传递引用而不是克隆。

这里较高的开销将来自Mutex(或RwLock) if Checkeris not Sync。正如我所提到的,任何不可变的值都是平凡Sync的,所以如果你可以重构Checkerto be的内部状态Sync,那么你可以Mutex完全避免而只拥有Checkercontains Arc<State>

如果您目前有可变状态,请考虑提取它,朝着:

struct Checker {
    unsendable: Arc<Immutable>,
}

impl Checker {
    pub fn check(&self, input: String) -> Mutable {
        unimplemented!()
    }
}

或者放弃并行检查的想法。


推荐阅读