首页 > 解决方案 > 如何安全地在线程之间移动数据?

问题描述

我目前正在尝试调用一个函数,我将多个文件名传递给该函数,并期望该函数读取文件并生成适当的结构并将它们以Vec<Audit>. 我已经能够完成它一个一个地读取文件,但我想使用线程来实现它。

这是功能:

fn generate_audits_from_files(files: Vec<String>) -> Vec<Audit> {
    let mut audits = Arc::new(Mutex::new(vec![]));
    let mut handlers = vec![];

    for file in files {
        let audits = Arc::clone(&audits);

        handlers.push(thread::spawn(move || {
            let mut audits = audits.lock().unwrap();
            audits.push(audit_from_xml_file(file.clone()));
            audits
        }));
    }

    for handle in handlers {
        let _ = handle.join();
    }

    audits
        .lock()
        .unwrap()
        .into_iter()
        .fold(vec![], |mut result, audit| {
            result.push(audit);
            result
        })
}

但由于以下错误,它不会编译:

error[E0277]: `MutexGuard<'_, Vec<Audit>>` cannot be sent between threads safely
   --> src/main.rs:82:23
    |
82  |         handlers.push(thread::spawn(move || {
    |                       ^^^^^^^^^^^^^ `MutexGuard<'_, Vec<Audit>>` cannot be sent between threads safely
    | 
   ::: /home/enthys/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/std/src/thread/mod.rs:618:8

我曾尝试将生成的Audit结构包装起来Some(Audit)以避免出现问题,MutexGuard但后来我遇到了Poisonned Thread问题。

标签: multithreadingrust

解决方案


错误的原因是在将新的审计推送到(锁定的)审计 vec 之后,您尝试返回 vec 的MutexGuard.

在 Rust 中,线程的函数实际上可以返回值,这样做的目的是将值发送回给join线程运行的任何人。这意味着该值将在线程之间移动,因此该值需要在线程之间移动(又名Send),而互斥锁没有理由是 [0]。

简单的解决方案是……不要那样做。只需删除函数的最后一行spawn。虽然它不像代码那样工作,因为你最后仍然有与事情相关的借用问题。

另一种方法是使用该功能(尤其是在Audit对象不太大的情况下):audits完全删除 vec,而是让每个线程返回其审核,然后从handlers当你收集join它们时收集:

pub fn generate_audits_from_files(files: Vec<String>) -> Vec<Audit> {
    let mut handlers = vec![];

    for file in files {
        handlers.push(thread::spawn(move || {
            audit_from_xml_file(file)
        }));
    }
    
    handlers.into_iter()
        .map(|handler| handler.join().unwrap())
        .collect()
}

尽管那时您不妨让 Rayon 处理它:

use rayon::prelude::*;

pub fn generate_audits_from_files(files: Vec<String>) -> Vec<Audit> {
    files.into_par_iter().map(audit_from_xml_file).collect()
}

如果您碰巧有数百万个文件,这也可以避免程序崩溃或机器瘫痪。

[0] 以及所有不支持的原因,不一定支持锁定一个线程并解锁另一个线程,例如ReleaseMutex

ReleaseMutex如果调用线程不拥有互斥对象,则该函数将失败。

(注意:在 Windows 术语中,“拥有”一个互斥体意味着通过 获取它,在 posix 术语WaitForSingleObject中翻译为)lock

并且可以是普通的UB,例如pthread_mutex_unlock

如果线程尝试解锁它尚未锁定的互斥锁或解锁的互斥锁,则会导致未定义的行为。


推荐阅读