python - 这是我能从 Python 多进程中获得的最大收益吗?
问题描述
我有数据,它在一个文本文件中。每一行都是一个计算。这个文件有大约 100 000 000 行。
首先我将所有内容加载到内存中,然后我有一个执行计算并给出以下结果的方法:
def process(data_line):
#do computation
return result
然后我用 2000 行的数据包这样调用它,然后将结果保存到磁盘:
POOL_SIZE = 15 #nbcore - 1
PACKET_SIZE = 2000
pool = Pool(processes=POOL_SIZE)
data_lines = util.load_data_lines(to_be_computed_filename)
number_of_packets = int(number_of_lines/ PACKET_SIZE)
for i in range(number_of_packets):
lines_packet = data_lines[:PACKET_SIZE]
data_lines = data_lines[PACKET_SIZE:]
results = pool.map(process, lines_packet)
save_computed_data_to_disk(to_be_computed_filename, results)
# process the last packet, which is smaller
results.extend(pool.map(process, data_lines))
save_computed_data_to_disk(to_be_computed_filename, results)
print("Done")
问题是,当我写入磁盘时,我的 CPU 没有计算任何东西并且有 8 个内核。它正在查看任务管理器,似乎丢失了相当多的 CPU 时间。
我必须在完成计算后写入磁盘,因为结果比输入大 1000 倍。无论如何,我必须在某个时候写入磁盘。如果时间没有在这里丢失,它会在以后丢失。
我能做些什么来让一个核心写入磁盘,同时仍然与其他核心一起计算?切换到 C?
按照这个速度,我可以在 75 小时内处理 1 亿行,但我有 120 亿行要处理,因此欢迎任何改进。
计时示例:
Processing packet 2/15 953 of C:/processing/drop_zone\to_be_processed_txt_files\t_to_compute_303620.txt
Launching task and waiting for it to finish...
Task completed, Continuing
Packet was processed in 11.534576654434204 seconds
We are currently going at a rate of 0.002306915330886841 sec/words
Which is 433.47928145051293 words per seconds
Saving in temporary file
Printing writing 5000 computed line to disk took 0.04400920867919922 seconds
saving word to resume from : 06 20 25 00 00
Estimated time for processing the remaining packets is : 51:19:25
解决方案
注意:这个 SharedMemory 仅适用于 Python >= 3.8,因为它第一次出现在那里
启动 3 种进程:Reader、Processor(s)、Writer。
让 Reader 进程以增量方式读取文件,并通过shared_memory
Queue 共享结果。
让处理器使用队列,使用 shared_memory,并通过另一个队列返回结果。同样,作为 shared_memory。
让 Writer 进程使用第二个队列,写入目标文件。
让它们都通过一些Event
s 或DictProxy
与将充当协调器的 MainProcess 进行通信。
例子:
import time
import random
import hashlib
import multiprocessing as MP
from queue import Queue, Empty
# noinspection PyCompatibility
from multiprocessing.shared_memory import SharedMemory
from typing import Dict, List
def readerfunc(
shm_arr: List[SharedMemory], q_out: Queue, procr_ready: Dict[str, bool]
):
numshm = len(shm_arr)
for batch in range(1, 6):
print(f"Reading batch #{batch}")
for shm in shm_arr:
#### Simulated Reading ####
for j in range(0, shm.size):
shm.buf[j] = random.randint(0, 255)
#### ####
q_out.put((batch, shm))
# Need to sync here because we're reusing the same SharedMemory,
# so gotta wait until all processors are done before sending the
# next batch
while not q_out.empty() or not all(procr_ready.values()):
time.sleep(1.0)
def processorfunc(
q_in: Queue, q_out: Queue, suicide: type(MP.Event()), procr_ready: Dict[str, bool]
):
pname = MP.current_process().name
procr_ready[pname] = False
while True:
time.sleep(1.0)
procr_ready[pname] = True
if q_in.empty() and suicide.is_set():
break
try:
batch, shm = q_in.get_nowait()
except Empty:
continue
print(pname, "got batch", batch)
procr_ready[pname] = False
#### Simulated Processing ####
h = hashlib.blake2b(shm.buf, digest_size=4, person=b"processor")
time.sleep(random.uniform(5.0, 7.0))
#### ####
q_out.put((pname, h.hexdigest()))
def writerfunc(q_in: Queue, suicide: type(MP.Event())):
while True:
time.sleep(1.0)
if q_in.empty() and suicide.is_set():
break
try:
pname, digest = q_in.get_nowait()
except Empty:
continue
print("Writing", pname, digest)
#### Simulated Writing ####
time.sleep(random.uniform(3.0, 6.0))
#### ####
print("Writing", pname, digest, "done")
def main():
shm_arr = [
SharedMemory(create=True, size=1024)
for _ in range(0, 5)
]
q_read = MP.Queue()
q_write = MP.Queue()
procr_ready = MP.Manager().dict()
poison = MP.Event()
poison.clear()
reader = MP.Process(target=readerfunc, args=(shm_arr, q_read, procr_ready))
procrs = []
for n in range(0, 3):
p = MP.Process(
target=processorfunc, name=f"Proc{n}", args=(q_read, q_write, poison, procr_ready)
)
procrs.append(p)
writer = MP.Process(target=writerfunc, args=(q_write, poison))
reader.start()
[p.start() for p in procrs]
writer.start()
reader.join()
print("Reader has ended")
while not all(procr_ready.values()):
time.sleep(5.0)
poison.set()
[p.join() for p in procrs]
print("Processors have ended")
writer.join()
print("Writer has ended")
[shm.close() for shm in shm_arr]
[shm.unlink() for shm in shm_arr]
if __name__ == '__main__':
main()
推荐阅读
- c# - Visual Studio Android SeekBar Rotation 属性生成 IntelliSense 错误
- string - 如何在bash中生成随机字符串?
- r - 在 R 中翻译 matlab fminsearchbnd?
- python - 通过自制软件安装的 Tesseract,Anaconda 说没有模块 'pytesseract?
- sql-server - 远程 SQL Server 连接问题
- mysql - 使用进度条从 mysql 数据库下载 blob
- c++ - 如何在函数调用中将整数转换为字符指针
- javascript - 如何在类初始化时从构造函数数据创建对象
- apache - 在 rewriteCond 中匹配 HTTP_HOST?
- autodesk-forge - OSS 存储桶中的 DWG 或其他 2D 文件版本控制