python - Python:产生时并行处理
问题描述
我正在文件列表中创建行的生成器,我的方法类似于:
def load(f_name: str):
with open(f_name, "r") as f:
lines = f.readlines()
# some calculations
return lines
def iter_files(f_names: list):
for f in f_names:
for line in load(f):
yield line
如果可能且有用的话,我想做的是加载下一个文件,同时屈服于另一个文件。作为多处理的新手,我尝试了以下方法:
cache = dict()
def load(f_name: str, id: int):
global cache
with open(f_name, "r") as f:
lines = f.readlines()
# some calculations
cache[id] = lines
def iter_list(arr):
for x in arr:
yield x
def iter_files(f_names: list):
global cache
num_files = len(f_names)
load(f_names[0], 0)
for n in range(num_files - 1):
current = mp.Process(target=iter_list, args=(cache[n],))
next = mp.Process(target=load, args=(f_names[n + 1], n + 1))
current.start()
next.start()
current.join()
next.join()
del cache[n]
iter_list(cache[num_files - 1])
del cache[num_files - 1]
但除了看起来过于复杂之外,它还不起作用。
首先,如果我不将主代码放入 'if __name__ == "__main__":' (我宁愿不是强制性的),我会收到以下错误:
RuntimeError:
An attempt has been made to start a new process before the
current process has finished its bootstrapping phase.
但即使我这样做,文件也不会添加到缓存中:
current = mp.Process(target=iter_list, args=(cache[n],))
KeyError: 1
是否有可能实现我想要做的事情?我究竟做错了什么?
谢谢你们
解决方案
该multiprocessing.Queue
课程非常适合此。您put
在一端(子流程)输入线路,get
然后在另一端(主流程)返回。不幸的是,没有内置方法可以将队列标记为“已完成”,因此我们需要put
一个标记值,例如None
指示所有行都已处理。
import multiprocessing as mp
def load(f_name: str):
with open(f_name, "r") as f:
lines = f.readlines()
# some calculations
return lines
def iter_files(f_names: list, queue: mp.Queue):
for f in f_names:
for line in load(f):
queue.put(line)
queue.put(None)
def iter_files_process(f_names: list):
queue = mp.Queue()
process = mp.Process(target=iter_files, args=(f_names, queue))
process.start()
while True:
line = queue.get()
if line is None: # End-of-queue value.
break
yield line
process.join() # Wait for the process to be completely finished.
if __name__ == "__main__":
for line in iter_files_process(['a.txt', 'b.txt']):
print(line, end='')
推荐阅读
- c# - .NET5/Core 下的 Linux 进程输出重定向导致不正确/损坏的行为
- c# - 按我们想要的特定顺序对 DataTable 的行进行排序
- python - Pyspark 日期格式
- hana - 用 SQL 检查 HANA 版本?
- python - 每当我尝试导入任何开源库或尝试升级我的 pip 时,都会弹出此错误。有人可以帮帮我吗?
- visual-studio-code - 将调试器步骤的结果显示为内联文本
- android - 是否可以让循环暂停以等待其中的活动完成?
- html - 图像后显示的小空白
- c++ - 应用程序的返回码是 int16_t?
- c# - 从 Internet 链接下载到动态创建的文件夹。这个文件夹的路径如何?