首页 > 解决方案 > 无法通过 multiprocessing.Pool 将文件句柄和锁定传递给进程?

问题描述

multiprocessing.Pool()用来启动一堆进程,每个进程都写入同一个文件(使用锁)。

每个进程都被分配了一个“任务”,它只是一个参数元组。

其中一个参数是文件句柄,另一个参数是锁。

但是 Python 不喜欢我既不传递文件句柄也不传递锁。

(当简单地调用时,我可以使用, 来做到这一点。)multiprocessing.Poolmultiprocessing.Process

例子。

import multiprocessing as mp
import time
import random

def thr_work00(args):
  arg0 = args[0]
  arg1 = args[1]
  arg2 = args[2]
  arg3 = args[3]
  arg4 = args[4]
  s = random.random()/10
  time.sleep(s)
  print(f'\x1b[92m{arg0} \x1b[32m{s:.3f}\x1b[0m')
  return args

o_file = open('test.txt','w')
o_lock = mp.Lock()

tasks = [
  [0, 0,1, o_file,o_lock],
  [1, 2,3, o_file,o_lock],
  [2, 4,5, o_file,o_lock],
  [3, 6,7, o_file,o_lock],
]

with mp.Pool(2) as pool:
  results = pool.map(thr_work00, tasks)
  for res in results:
    print(res)

传递文件时,我得到:TypeError: cannot serialize '_io.TextIOWrapper' object .

通过锁时,我得到:RuntimeError: Lock objects should only be shared between processes through inheritance

我怎样才能解决这个问题?


编辑。

所以我想知道这是否可以(它似乎正在工作)。我唯一关心的是每个write本身都是原子的,但写入完成的顺序并不重要。

import multiprocessing as mp
import time
import random
import os

# ----------------------------------------------------------------
def thr_work00(args):
  arg0 = args[0]
  arg1 = args[1]
  s = random.random()/10
  time.sleep(s)
  txt = 1004*str(arg0)
  with open('test.txt','a') as o_file:
    o_file.write(f'{txt}\n')
  print(f'\x1b[92m{arg0} \x1b[32m{s:.3f}\x1b[0m')
  return args

# ----------------------------------------------------------------
os.remove('test.txt')

tasks = [
  [0, 0xf0],
  [1, 0xf1],
  [2, 0xf2],
  [3, 0xf3],
  [4, 0xf4],
  [5, 0xf5],
  [6, 0xf6],
  [7, 0xf7],
]

with mp.Pool(2) as pool:
  results = pool.map(thr_work00, tasks)
  for res in results:
    print(res)

标签: pythonfilemultiprocessingthreadpoolpython-multiprocessing

解决方案


对于锁和打开文件描述符,您应该通过进程继承来共享它们,而不是尝试将它们作为参数传递。子进程从其父进程继承所有打开的文件描述符,因此您可以这样编写代码:

import multiprocessing as mp
import time
import random


def thr_work00(args):
    global o_lock, o_file

    s = random.randint(0, 5)
    with o_lock:
        time.sleep(s)
        print(f"\x1b[92m{args[0]} \x1b[32m{s}\x1b[0m")
        o_file.write(f"{args[0]} {s}\n")
        o_file.flush()
    return args


with open("test.txt", "w") as o_file:
    o_lock = mp.Lock()

    tasks = [
        [0, 0, 1],
        [1, 2, 3],
        [2, 4, 5],
        [3, 6, 7],
    ]

    with mp.Pool(2) as pool:
        results = pool.map(thr_work00, tasks)
        for res in results:
            print(res)

或者,不要在工作线程中写入文件,而是在收集结果时在主线程中执行写入。这消除了对锁的需求,因为您不再需要担心多个进程写入同一个文件......

...或者,如果您需要“实时”写入,而不是最后,请使用Queue.


这是使用队列将结果传递给专用写入器的一个示例:

import multiprocessing as mp
import time
import random

resultq = mp.Queue()


def thr_work00(args):
    global resultq
    s = random.randint(0, 5)
    print(f"\x1b[92m{args[0]} \x1b[32m{s}\x1b[0m")
    time.sleep(s)
    resultq.put((args[0], s))
    return args


def thr_writer():
    global resultq
    print('writer start')
    with open('test.txt', 'w') as fd:
        while True:
            item = resultq.get()
            if item is None:
                break
            fd.write(f'{item[0]}: {item[1]}\n')
    print('writer exit')


with open("test.txt", "w") as o_file:
    o_lock = mp.Lock()

    writer = mp.Process(target=thr_writer)
    writer.start()

    tasks = [
        [0, 0, 1],
        [1, 2, 3],
        [2, 4, 5],
        [3, 6, 7],
    ]

    with mp.Pool(2) as pool:
        results = pool.map(thr_work00, tasks)
        for res in results:
            print(res)

    resultq.put(None)
    writer.join()

推荐阅读