首页 > 解决方案 > 串行到并行到串行数据处理的模式

问题描述

我正在处理数据集数组,遍历每个数据集以提取信息,并使用提取的信息构建一个新数据集,然后将其传递给可能对数据执行并行 I/O(请求)的并行处理函数。

返回的是一个包含新信息的新数据集数组,然后我必须将其与前一个合并。模式最终是 Loop->parallel->Loop。

parallel_request = []
for item in dataset:
     transform(item)
     subdata = extract(item)
     parallel_request.append(subdata)

new_dataset = parallel_function(parallel_request)

for item in dataset:
    transform(item)
    subdata = extract(item)
    if subdata in new_dataset:
        item[subdata] = new_dataset[subdata]

我被迫使用两个循环。一次构建并行请求,再次将并行结果与我的旧数据合并。这些循环中的大块最终会重复步骤。这种模式在我的代码中变得非常普遍和重复。

在向 中添加数据后,是否有一些技术可以在第一个循环中“屈服” parallel_request,继续下一个项目。一旦parallel_request被填充,执行parallel function,然后再次为每个项目恢复循环,恢复先前保存的上下文(局部变量)。

编辑:我认为一种解决方案是使用函数而不是循环,并递归调用它。缺点是我肯定会达到递归限制。

parallel_requests = []
final_output = []
index = 0
def process_data(dataset, last=False):
    data = dataset[index]
    data2 = transform(data)
    data3 = expensive_slow_transform(data2)
    subdata = extract(data3)
    # ... some other work

    index += 1

    parallel_requests.append(subdata)

    # If not last, recurse
    # Otherwise, call the processing function.
    if not last:
        process_data(dataset, index == len(dataset))
    else:
        new_data = process_requests(parallel_requests)

    # Now processing of each item can resume, keeping it's
    # local data variables, transforms, subdata...etc.
    final_data = merge(subdata, new_data[index], data, data2, data3))
    final_output.append(final_data)

process_data(original_dataset)

任何解决方案都将涉及以某种方式保留数据、data2、data3、子数据...等,这些数据必须存储在某个地方。递归使用堆栈来存储它们,这将触发递归限制。另一种方法是将它们存储在循环之外的某个数组中,这会使代码更加麻烦。另一种解决方案是重新计算它们,并且还需要重复代码。

所以我怀疑要实现这一点,你需要一些特定的 Python 工具来实现这一点。

标签: python

解决方案


我相信我已经解决了这个问题:

基于前面的递归代码,您可以利用 Python 提供的生成器工具在调用并行函数时保留串行上下文:

def process_data(dataset, parallel_requests, final_output):
    data = dataset[index]
    data2 = transform(data)
    data3 = expensive_slow_transform(data2)
    subdata = extract(data3)
    # ... some other work

    parallel_requests.append(subdata)

    yield

    # Now processing of each item can resume, keeping it's
    # local data variables, transforms, subdata...etc.
    final_data = merge(subdata, new_data[index], data, data2, data3))
    final_output.append(final_data)

final_output = []
parallel_requests = []
funcs = [process_data(datum, parallel_requests, final_output) for datum in dataset]
[next(f) for f in funcs]
process_requests(parallel_requests)
[next(f) for f in funcs]

输出列表和生成器调用足够通用,您可以在辅助函数中抽象出这些行,设置它并为您调用生成器,从而产生非常干净的结果,代码开销是函数定义的一行,而一行呼叫助手。


推荐阅读