python - 无法通过 multiprocessing.Pool 将文件句柄和锁定传递给进程?
问题描述
我multiprocessing.Pool()
用来启动一堆进程,每个进程都写入同一个文件(使用锁)。
每个进程都被分配了一个“任务”,它只是一个参数元组。
其中一个参数是文件句柄,另一个参数是锁。
但是 Python 不喜欢我既不传递文件句柄也不传递锁。
(当简单地调用时,我可以不使用, 来做到这一点。)multiprocessing.Pool
multiprocessing.Process
例子。
import multiprocessing as mp
import time
import random
def thr_work00(args):
arg0 = args[0]
arg1 = args[1]
arg2 = args[2]
arg3 = args[3]
arg4 = args[4]
s = random.random()/10
time.sleep(s)
print(f'\x1b[92m{arg0} \x1b[32m{s:.3f}\x1b[0m')
return args
o_file = open('test.txt','w')
o_lock = mp.Lock()
tasks = [
[0, 0,1, o_file,o_lock],
[1, 2,3, o_file,o_lock],
[2, 4,5, o_file,o_lock],
[3, 6,7, o_file,o_lock],
]
with mp.Pool(2) as pool:
results = pool.map(thr_work00, tasks)
for res in results:
print(res)
传递文件时,我得到:TypeError: cannot serialize '_io.TextIOWrapper' object
.
通过锁时,我得到:RuntimeError: Lock objects should only be shared between processes through inheritance
。
我怎样才能解决这个问题?
编辑。
所以我想知道这是否可以(它似乎正在工作)。我唯一关心的是每个write
本身都是原子的,但写入完成的顺序并不重要。
import multiprocessing as mp
import time
import random
import os
# ----------------------------------------------------------------
def thr_work00(args):
arg0 = args[0]
arg1 = args[1]
s = random.random()/10
time.sleep(s)
txt = 1004*str(arg0)
with open('test.txt','a') as o_file:
o_file.write(f'{txt}\n')
print(f'\x1b[92m{arg0} \x1b[32m{s:.3f}\x1b[0m')
return args
# ----------------------------------------------------------------
os.remove('test.txt')
tasks = [
[0, 0xf0],
[1, 0xf1],
[2, 0xf2],
[3, 0xf3],
[4, 0xf4],
[5, 0xf5],
[6, 0xf6],
[7, 0xf7],
]
with mp.Pool(2) as pool:
results = pool.map(thr_work00, tasks)
for res in results:
print(res)
解决方案
对于锁和打开文件描述符,您应该通过进程继承来共享它们,而不是尝试将它们作为参数传递。子进程从其父进程继承所有打开的文件描述符,因此您可以这样编写代码:
import multiprocessing as mp
import time
import random
def thr_work00(args):
global o_lock, o_file
s = random.randint(0, 5)
with o_lock:
time.sleep(s)
print(f"\x1b[92m{args[0]} \x1b[32m{s}\x1b[0m")
o_file.write(f"{args[0]} {s}\n")
o_file.flush()
return args
with open("test.txt", "w") as o_file:
o_lock = mp.Lock()
tasks = [
[0, 0, 1],
[1, 2, 3],
[2, 4, 5],
[3, 6, 7],
]
with mp.Pool(2) as pool:
results = pool.map(thr_work00, tasks)
for res in results:
print(res)
或者,不要在工作线程中写入文件,而是在收集结果时在主线程中执行写入。这消除了对锁的需求,因为您不再需要担心多个进程写入同一个文件......
...或者,如果您需要“实时”写入,而不是最后,请使用Queue
.
这是使用队列将结果传递给专用写入器的一个示例:
import multiprocessing as mp
import time
import random
resultq = mp.Queue()
def thr_work00(args):
global resultq
s = random.randint(0, 5)
print(f"\x1b[92m{args[0]} \x1b[32m{s}\x1b[0m")
time.sleep(s)
resultq.put((args[0], s))
return args
def thr_writer():
global resultq
print('writer start')
with open('test.txt', 'w') as fd:
while True:
item = resultq.get()
if item is None:
break
fd.write(f'{item[0]}: {item[1]}\n')
print('writer exit')
with open("test.txt", "w") as o_file:
o_lock = mp.Lock()
writer = mp.Process(target=thr_writer)
writer.start()
tasks = [
[0, 0, 1],
[1, 2, 3],
[2, 4, 5],
[3, 6, 7],
]
with mp.Pool(2) as pool:
results = pool.map(thr_work00, tasks)
for res in results:
print(res)
resultq.put(None)
writer.join()
推荐阅读
- android - 待定意图打开错误的活动
- python-3.x - 代码生成二维直方图,但结果与 hist2d 不匹配
- python - 如何使用 ThreadPoolExecutor 递归遍历目录?
- python - Python Snake 教程,尾巴/速度
- amazon-web-services - 如何从 AWS 控制台重命名 AWS RDS 参数组
- apache-kafka - 仅一次添加到 Kafka 主题
- c++ - 如何在屏幕上显示分数 C++ SDL
- javascript - Javascript混合内存地址?(重置数组不起作用)
- mysql - 使用环境变量执行 MySQL init 命令创建数据库
- python - 如何根据字符串中的两个字符“[”和“]”进行拆分