python - 串行到并行到串行数据处理的模式
问题描述
我正在处理数据集数组,遍历每个数据集以提取信息,并使用提取的信息构建一个新数据集,然后将其传递给可能对数据执行并行 I/O(请求)的并行处理函数。
返回的是一个包含新信息的新数据集数组,然后我必须将其与前一个合并。模式最终是 Loop->parallel->Loop。
parallel_request = []
for item in dataset:
transform(item)
subdata = extract(item)
parallel_request.append(subdata)
new_dataset = parallel_function(parallel_request)
for item in dataset:
transform(item)
subdata = extract(item)
if subdata in new_dataset:
item[subdata] = new_dataset[subdata]
我被迫使用两个循环。一次构建并行请求,再次将并行结果与我的旧数据合并。这些循环中的大块最终会重复步骤。这种模式在我的代码中变得非常普遍和重复。
在向 中添加数据后,是否有一些技术可以在第一个循环中“屈服” parallel_request
,继续下一个项目。一旦parallel_request
被填充,执行parallel function
,然后再次为每个项目恢复循环,恢复先前保存的上下文(局部变量)。
编辑:我认为一种解决方案是使用函数而不是循环,并递归调用它。缺点是我肯定会达到递归限制。
parallel_requests = []
final_output = []
index = 0
def process_data(dataset, last=False):
data = dataset[index]
data2 = transform(data)
data3 = expensive_slow_transform(data2)
subdata = extract(data3)
# ... some other work
index += 1
parallel_requests.append(subdata)
# If not last, recurse
# Otherwise, call the processing function.
if not last:
process_data(dataset, index == len(dataset))
else:
new_data = process_requests(parallel_requests)
# Now processing of each item can resume, keeping it's
# local data variables, transforms, subdata...etc.
final_data = merge(subdata, new_data[index], data, data2, data3))
final_output.append(final_data)
process_data(original_dataset)
任何解决方案都将涉及以某种方式保留数据、data2、data3、子数据...等,这些数据必须存储在某个地方。递归使用堆栈来存储它们,这将触发递归限制。另一种方法是将它们存储在循环之外的某个数组中,这会使代码更加麻烦。另一种解决方案是重新计算它们,并且还需要重复代码。
所以我怀疑要实现这一点,你需要一些特定的 Python 工具来实现这一点。
解决方案
我相信我已经解决了这个问题:
基于前面的递归代码,您可以利用 Python 提供的生成器工具在调用并行函数时保留串行上下文:
def process_data(dataset, parallel_requests, final_output):
data = dataset[index]
data2 = transform(data)
data3 = expensive_slow_transform(data2)
subdata = extract(data3)
# ... some other work
parallel_requests.append(subdata)
yield
# Now processing of each item can resume, keeping it's
# local data variables, transforms, subdata...etc.
final_data = merge(subdata, new_data[index], data, data2, data3))
final_output.append(final_data)
final_output = []
parallel_requests = []
funcs = [process_data(datum, parallel_requests, final_output) for datum in dataset]
[next(f) for f in funcs]
process_requests(parallel_requests)
[next(f) for f in funcs]
输出列表和生成器调用足够通用,您可以在辅助函数中抽象出这些行,设置它并为您调用生成器,从而产生非常干净的结果,代码开销是函数定义的一行,而一行呼叫助手。
推荐阅读
- vb.net - 为什么保存后我的 JPG 图像质量(颜色)比原始文件更暗?
- python - Drawing a common horizontal line for multiple subplots
- vba - Create QueryDef without a name in MS Access
- c# - 数据集的查询执行失败。(rsErrorExecutingCommand)
- java - 如何使用 logback 在日志文件中记录表?
- perl - 为什么 Data::Dumper 显示链接到其他值的值?
- flatpak - What's a flatpak runtime?
- spring - 如何验证“授权:承载
“与骡子3.8? - c# - 窗体上的某些控件未在 C# windows 窗体中的某些窗体上调整大小
- r - 解决错误“对象
在地理编码上下文中找不到”