首页 > 解决方案 > 当一组任务依赖于另一组时,如何在 python 中同时完成两组任务?

问题描述

我有大量的小文件要从 s3 下载和处理。

下载速度相当快,因为​​每个文件只有几兆字节。它们加在一起约为 100GB。处理时间大约是下载时间的两倍,并且纯粹受 CPU 限制。因此,通过在下载其他文件的同时在多个线程中完成处理,应该可以缩短整体运行时间。

目前,我正在下载一个文件,处理它并继续下一个文件。python中有没有一种方法可以让我一个接一个地下载所有文件并在完成下载后立即处理每个文件?这里的关键区别在于,当每个文件都在处理时,另一个总是在下载。

我的代码如下所示:

files = {'txt': ['filepath1', 'filepath2', ...], 
         'tsv': ['filepath1', 'filepath2', ...]
        } 

for kind in files.keys():
    subprocess.check_call(f'mkdir -p {kind}', shell=True)
    subprocess.call(f'mkdir -p {kind}/normalized', shell=True)

    for i, file in enumerate(files[kind]):
        subprocess.call(f'aws s3 cp s3://mys3bucket.com/{file} {kind}/', shell=True)
        f = file.split('/')[-1]
        subprocess.check_call('my_process_function --input "{kind}/{f}" --output "{kind}/normalized/normalize_{f}" --units relab', shell=True)

我还编写了一个多处理解决方案,我可以同时下载和处理多个文件,但这并没有提高速度,因为网络速度已经饱和。瓶颈在于处理。我已经包含它以防万一它对你们有帮助。

from contextlib import closing
from os import cpu_count
from multiprocessing import Pool

def download_and_proc(file, kind='txt'):
    subprocess.call(f'aws s3 cp s3://mys3bucket.com/{file} {kind}/', shell=True)
    f = file.split('/')[-1]
    subprocess.check_call('my_process_function --input "{kind}/{f}" --output "{kind}/normalized/normalize_{f}" --units relab', shell=True)

with closing(Pool(processes=cpu_count()*2)) as pool:
        pool.map(download_and_proc, files)

标签: pythonmultithreadingmultiprocessing

解决方案


从长远来看,您当前的多处理代码应该非常接近最优。它不会总是以最大速度下载,因为负责下载文件的相同执行线程将等到文件处理完毕后再下载另一个文件。但它通常应该在处理过程中消耗所有 CPU,即使某些网络容量未使用。如果您也尝试始终下载,您最终会用完要下载的文件,并且网络将在相同的时间内空闲,只是在批处理作业结束时。

一个可能的例外是处理文件所花费的时间总是完全相同。然后您可能会发现您的工作人员以同步方式运行,他们都同时下载,然后所有进程同时处理,即使工作人员数量多于可供它们运行的​​ CPU 数量。除非处理以某种方式与实时时钟相关联,否则这似乎不会持续很长时间。大多数情况下,您会先完成某些流程,因此下载最终会错开。

所以改进你的代码不太可能给你带来太多的加速。如果你认为你需要它,你可以将下载和处理分成两个独立的池。甚至可以在主进程中将其中一个作为单进程循环执行,但我将在这里展示完整的两池版本:

def download_worker(file, kind='txt'):
    subprocess.call(f'aws s3 cp s3://mys3bucket.com/{file} {kind}/', shell=True)
    return file

def processing_worker(file, kind='txt')
    f = file.split('/')[-1]
    subprocess.check_call('my_process_function --input "{kind}/{f}" --output "{kind}/normalized/normalize_{f}" --units relab', shell=True)

with Pool() as download_pool, Pool() as processing_pool:
    downloaded_iterator = download_pool.imap(download_worker, files)  # imap returns an iterator
    processing_pool.map(processing_worker, downloaded_iterator)

这应该尽可能快地下载和处理您的系统。如果文件下载所需的时间少于其处理时间,那么很可能第一个池将在第二个池之前完成,代码将处理得很好。如果处理不是瓶颈,它也会支持(第二个池有时会空闲,等待文件完成下载)。


推荐阅读