python - 无法将函数并行映射到 tarfile 成员
问题描述
我有一个包含 bz2 压缩文件的 tarfile。我想将该函数clean_file
应用于每个 bz2 文件,并整理结果。在系列中,这很容易通过循环:
import pandas as pd
import json
import os
import bz2
import itertools
import datetime
import tarfile
from multiprocessing import Pool
def clean_file(member):
if '.bz2' in str(member):
f = tr.extractfile(member)
with bz2.open(f, "rt") as bzinput:
dicts = []
for i, line in enumerate(bzinput):
line = line.replace('"name"}', '"name":" "}')
dat = json.loads(line)
dicts.append(dat)
bzinput.close()
f.close()
del f, bzinput
processed = dicts[0]
return processed
else:
pass
# Open tar file and get contents (members)
tr = tarfile.open('data.tar')
members = tr.getmembers()
num_files = len(members)
# Apply the clean_file function in series
i=0
processed_files = []
for m in members:
processed_files.append(clean_file(m))
i+=1
print('done '+str(i)+'/'+str(num_files))
但是,我需要能够并行执行此操作。我正在尝试使用的方法Pool
如下:
# Apply the clean_file function in parallel
if __name__ == '__main__':
with Pool(2) as p:
processed_files = list(p.map(clean_file, members))
但这会返回一个 OSError:
Traceback (most recent call last):
File "/Users/johnfoley/opt/anaconda3/envs/racing_env/lib/python3.6/multiprocessing/pool.py", line 119, in worker
result = (True, func(*args, **kwds))
File "parse_data.py", line 19, in clean_file
for i, line in enumerate(bzinput):
File "/Users/johnfoley/opt/anaconda3/envs/racing_env/lib/python3.6/bz2.py", line 195, in read1
return self._buffer.read1(size)
File "/Users/johnfoley/opt/anaconda3/envs/racing_env/lib/python3.6/_compression.py", line 68, in readinto
data = self.read(len(byte_view))
File "/Users/johnfoley/opt/anaconda3/envs/racing_env/lib/python3.6/_compression.py", line 103, in read
data = self._decompressor.decompress(rawblock, size)
OSError: Invalid data stream
"""
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "parse_data.py", line 53, in <module>
processed_files = list(tqdm.tqdm(p.imap(clean_file, members), total=num_files))
File "/Users/johnfoley/opt/anaconda3/envs/racing_env/lib/python3.6/site-packages/tqdm/std.py", line 1167, in __iter__
for obj in iterable:
File "/Users/johnfoley/opt/anaconda3/envs/racing_env/lib/python3.6/multiprocessing/pool.py", line 735, in next
raise value
OSError: Invalid data stream
所以我猜这种方式不能从 data.tar 或其他东西中正确访问文件。如何并行应用该功能?
我猜这将适用于任何包含 bz2 文件的 tar 存档,但这是我重现错误的数据: https ://github.com/johnf1004/reproduce_tar_error
解决方案
您没有指定您正在运行的平台,但我怀疑它是 Windows,因为您有...
if __name__ == '__main__':
main()
...这对于在使用 OS 功能spawn
创建新进程的平台上创建进程的代码是必需的。但这也意味着当创建一个新进程(例如,您正在创建的进程池中的所有进程)时,每个进程都会从程序的最顶端重新执行源程序开始。这意味着每个池进程正在执行以下代码:
tr = tarfile.open('data.tar')
members = tr.getmembers()
num_files = len(members)
但是,我不明白为什么这本身会导致错误,但我不能确定。但是,问题可能是,这是在调用您的工作函数之后执行的,clean_file
正在被调用,因此tr
尚未设置。如果此代码之前clean_file
它可能会工作,但这只是一个猜测。当然提取members = tr.getmembers()
每个池进程中的成员是浪费的。每个进程都需要打开 tar 文件,最好只打开一次。
但很明显,您发布的堆栈跟踪与您的代码不匹配。你展示:
Traceback (most recent call last):
File "parse_data.py", line 53, in <module>
processed_files = list(tqdm.tqdm(p.imap(clean_file, members), total=num_files))
然而,您的代码没有任何引用tqdm
或使用 method imap
。现在,当您发布的代码与产生异常的代码不完全匹配时,分析您的问题实际上是什么变得更加困难。
如果您在可能fork
用于创建新进程的 Mac 上运行,则当主进程创建了多个线程(您不一定会看到,可能是通过tarfile
模块)然后您可能会出现问题创建一个新进程,我已经指定了代码来确保spawn
用于创建新进程。无论如何,以下代码应该可以工作。它还引入了一些优化。如果没有,请发布一个新的堆栈跟踪。
import pandas as pd
import json
import os
import bz2
import itertools
import datetime
import tarfile
from multiprocessing import get_context
def open_tar():
# open once for each process in the pool
global tr
tr = tarfile.open('data.tar')
def clean_file(member):
f = tr.extractfile(member)
with bz2.open(f, "rt") as bzinput:
for line in bzinput:
line = line.replace('"name"}', '"name":" "}')
dat = json.loads(line)
# since you are returning just the first occurrence:
return dat
def main():
with tarfile.open('data.tar') as tr:
members = tr.getmembers()
# just pick members where '.bz2' is in member:
filtered_members = filter(lambda member: '.bz2' in str(member), members)
ctx = get_context('spawn')
# open tar file just once for each process in the pool:
with ctx.Pool(initializer=open_tar) as pool:
processed_files = pool.map(clean_file, filtered_members)
print(processed_files)
# required for when processes are created using spawn:
if __name__ == '__main__':
main()
推荐阅读
- npm - 使用来自 Webpack npm dev 脚本的 Vue 开发版本
- postgresql - 转储 postgresql 数据库源一的编码为“C.UTF-8”,目标一的编码为“en_US.UTF-8”
- android - 为什么离开主片段后建议隐藏底部导航栏?
- c# - Jwt Bearer 和依赖注入
- matlab - 扩展卡尔曼滤波器预测更新时间
- python - 从文本中的缩写中查找首字母缩写词
- c# - 在 UI 元素设置期间忽略属性设置调用
- json - 将 json 列表发布到烧瓶路由时出现 TypeError
- mysql - 我正在尝试在触发器上使用算术,但我的触发器不起作用
- c# - Swashbuckle 多态性需要鉴别器作为更新时的第一个属性