首页 > 解决方案 > 如何使用并行处理加速 python 函数?

问题描述

我有两个功能。每个函数都运行一个 for 循环。

def f1(df1, df2):
    final_items = []
    for ind, row in df1.iterrows():
        id = row['Id']
        some_num = row['some_num']
        timestamp = row['Timestamp']
        res = f2(df=df2, id=id, some_num=some_num, timestamp=timestamp))
        final_items.append(res)

return final_items

def f2(df, id, some_num, timestamp):
    for ind, row in df.iterrows():        
        filename = row['some_filename']
        dfx = reader(key=filename) # User defined; object reader
        # Assign variables
        st_ID = dfx["Id"]
        st_some_num = dfx["some_num"]
        st_time_first = dfx['some_first_time_variable']
        st_time_last = dfx['some_last_time_variable']        

        if device_id == st_ID and some_num == st_some_num:
            if st_time_first <= timestamp and st_time_last >= timestamp:
                return filename
            else:
                return None
        else:
            continue

如图所示,第一个函数调用第二个函数。第一个循环发生2000 次,即第一个数据帧中有 2000 行。

第二个函数(从 调用的函数f1())运行1000 万次

我的目标是加速f2()使用并行处理。我曾尝试使用像 Multiprocessing 和 Ray 这样的 python 包,但我是并行处理领域的新手,并且由于缺乏经验而遇到了很多障碍。

有人可以帮我加快函数速度,从而减少执行 1000 万行的时间吗?

标签: pythonpython-3.xparallel-processingmultiprocessingray

解决方案


FACTS :
初始公式要求2E3f1()请求f2()扫描1E7共享”中的行df2
以便调用一个未指定的reader()进程来接收一些其他数据以决定进一步处理或返回

f2()我的目标是使用并行处理来加快速度

有人可以帮助我加快功能,以便执行 1000 万行所需的时间要少得多吗?


惊喜一:这不是并行处理的用例

正如上面所阐述的,这个问题多次调用文件 I/O 操作,这些操作[PARALLEL]在物理存储级别上永远不会正确,是吗?绝不。任何和所有智能文件 I/O-(预)缓存和滑动窗口文件 I/O 技巧都无法帮助处理中等水平的[CONCURRENT]工作负载,如果超出主要工作负载一步,通常会造成严重破坏由于内存资源和 I/O 总线宽度 x 速度的物理限制范围以及最弱链元素的延迟在仍在增长的流量负载下增加。

工作流控制迭代器是纯粹的[SERIAL]“工作调度程序”,它们一个接一个地依次遍历它们的值域,并只订购另一个文件以进行(再次迭代地)处理。


惊喜二:矢量化无济于事

虽然向量化操作对于许多向量/矩阵/张量处理方案(喜欢使用numpy+ numba)来说是智能的,但 Condicio Sine Qua Non 是,问题必须是:

  1. 紧凑” - 以便通过矢量化语法技巧轻松表达,这个原始的 -row-after-row-after-row在“远程”文件内容[SERIAL]中找到第一个也是唯一的第一个device_ID匹配”,接下来return None if not ( <exprA> and <exprB> ) else filename

  2. 统一”,即非顺序“直到”某事第一次发生 - 向量化非常适合用智能内部代码“覆盖”整个 N 维空间,用于(最佳)正交子结构统一“跨”整个处理空间。相反,在第一次出现匹配后,矢量化很难重新排序“返回”以阻止(毒化)任何进一步的智能产生结果......(上面的参考文献1“首先找到并且只有第一个发生(和死亡/返回))

  3. “内存足够大”,即给定任何附加逻辑被添加到矢量化任务,每当代码要求矢量化引擎使用某种 - 子句处理 N-dim “数据”时,这种 - 条件where(...)的临时产品where(...)正在消耗额外[SPACE]的 -footprint(在 RAM 中最好,在 SWAP-file-I/O 中更糟),并且这种额外的内存占用可能很快会破坏矢量化处理重新制定的想法的所有好处(不谈论由于如此巨大的额外内存分配需求会导致整个流程流的交换文件 I/O 窒息) -行上where(...)的子句10E6是昂贵的,一旦全局策略执行那么1 < nCPUs < 2E3多次(如上所述,矢量化统一进行“跨越“整个数据范围,没有顺序有益的捷径可以在第一次且只有第一次匹配后停止......)


最好的下一步:依赖图 -> 延迟 -> 瓶颈

上面表述的问题是公正[CONCURRENT]处理,其中“共享”资源使用的实际阻塞或可用性限制了整体处理持续时间。使用的资源不超过一组给定的资源,因此没有什么神奇的机会可以加快并发使用模式以加快处理速度。因此,要利用的免费资源的“数量”及其各自的响应“延迟” 肯定是那些处于高水平并发工作负载之下的,而不是理想主义的、空载的响应时间**

  • 如果您没有分析数据,请至少测量/基准测试主要特征持续时间

a) 中的每行f2()进程延迟[ min, Avg, MAX, StDev][us]

b)reader()相关[ min, Avg, MAX, StDev]的设置/检索延迟[us]

  • 测试,reader()性能是否代表瓶颈 - 任意增加并发操作流程的上限

如果是这样,您将获得它可以处理的最大工作负载,并且基于此,并发处理可能会将速度提高到这个reader()确定的性能上限。

其余的都是基本的。


结语:

这种延迟数据设计的、(不可)避免的、瓶颈感知的、大小合适的并发处理设置可实现最大延迟屏蔽,这大约是在这里可以提供帮助的最大值。

如果有机会重新设计和重构全局策略,可能会有更快的处理时间,但这可能不是来自于顺序迭代器[SERIAL]的纯串联,它们指示对未知代码的 about调用序列。~ 20.000.000.000reader()

然而,这超出了 StackOverflow M in C un VE问题定义的范围。

希望你喜欢这篇文章,并且它可以激发一些关于如何更快地获得结果的新观点。聪明的想法可能会导致处理时间从几天缩短到几分钟(!)。这样做了几次之后,没有人会相信,如果您通过设计适合他们业务领域的大小合适的解决方案来实现这样的最终解决方案,那么完成这项艰苦的工作可能会让您和您的客户都受益。


推荐阅读