python - 共享对象中的python多进程和线程安全
问题描述
我对线程安全和多处理有点不确定。
据我所知, multiprocessing.Pool.map 腌制调用函数或对象,但保持引用传递的成员完好无损。
这似乎是有益的,因为它可以节省内存,但我没有在这些对象中找到任何有关线程安全的信息。
就我而言,我正在尝试从磁盘读取 numpy 数据,但是,我希望能够在不更改实现的情况下修改源代码,因此我将读取部分分解为自己的类。
我大致有以下情况:
import numpy as np
from multiprocessing import Pool
class NpReader():
def read_row(self, row_index):
pass
class NpReaderSingleFile(NpReader):
def read_row(self, row_index):
return np.load(self._filename_from_row(row_index))
def _filename_from_row(self, row_index):
return Path(row_index).with_suffix('.npy')
class NpReaderBatch(NpReader):
def __init__(self, batch_file, mmap_mode=None):
self.batch = np.load(batch_file, mmap_mode=mmap_mode)
def read_row(self, row_index):
read_index = row_index
return self.batch[read_index]
class ProcessRow():
def __init__(self, reader):
self.reader = reader
def __call__(self, row_index):
return reader.read_row(row_index).shape
readers = [
NpReaderSingleFile(),
NpReaderBatch('batch.npy'),
NpReaderBatch('batch.npy', mmap_mode='r')
]
res = []
for reader in readers:
with Pool(12) as mp:
res.append(mp.map(ProcessRow(reader), range(100000))
在我看来,这里有很多可能出错的地方,但不幸的是,我没有知识来确定测试它的内容。
上述方法有什么明显的问题吗?
我想到的一些事情是:
- np.load (它似乎适用于小型单个文件,但我可以对其进行测试以查看它是否安全?
- NpReaderBatch 是安全的还是 read_index 可以被不同的进程同时修改?
解决方案
推荐阅读
- reactjs - ReactJs在使用Radium插件时获取未定义错误的styleKeeperContext'
- mysql - 为什么 SQL LIMIT 子句为每个查询返回随机行?
- coroutine - 支持克隆暂停例程的堆栈协程
- flutter - 使用 Chewie 控制器在构建异常期间调用 setState() 或 markNeedsBuild()
- python-3.x - Python 请求无法从 API 检索数据
- elasticsearch - 用于类似 SQL 的查询的弹性查询
- python - 如何通过使用熊猫施加限制来处理异常值?
- r - SAS/R 中的多项式/序数假设检验
- amazon-web-services - AWS Cloudwatch 规则安排 Cron 表达式在一天内跳过 2 小时
- python - 如何正确使用 Pyinstaller?