python - 如何在 Dataflow 中启用并行读取文件?
问题描述
我正在开发一个 Dataflow 管道,该管道从 GCS 读取 1000 个文件(每个 50 MB),并对所有文件的行执行一些计算。每个文件都是具有相同结构的 CSV,只是其中的数字不同,我正在计算所有文件中每个单元格的平均值。
管道看起来像这样(python):
additional_side_inputs = {'key1': 'value1', 'key2': 'value2'} # etc.
p | 'Collect CSV files' >> MatchFiles(input_dir + "*.csv")
| 'Read files' >> ReadMatches()
| 'Parse contents' >> beam.ParDo(FileToRowsFn(), additional_side_inputs)
| 'Compute average' >> beam.CombinePerKey(AverageCalculatorFn())
该类FileToRowsFn
看起来像这样(见下文,省略了一些细节)。是第一列,row_id
是每一行的唯一键;它在每个文件中只存在一次,因此我可以计算所有文件的平均值。有一些附加值作为变压器的侧输入提供,这些值未显示在下面的方法主体中,但仍被实际实现使用。此值是在管道之外创建的字典。我在这里提到它,以防这可能是缺乏并行化的原因。
class FileToRowsFn(beam.DoFn):
def process(self, file_element, additional_side_inputs):
with file_element.open() as csv_file:
for row_id, *values in csv.reader(TextIOWrapper(csv_file, encoding='utf-8')):
yield row_id, values
这AverageCalculatorFn
是一个典型beam.CombineFn
的累加器,它对所有文件中具有相同 row_id 的所有行执行给定行的每个单元格的平均值。
所有这些工作正常,但性能和吞吐量存在问题:执行此管道需要 60 多个小时。在监控控制台中,我注意到文件是按顺序读取的(每 2 分钟读取 1 个文件)。我知道读取一个文件可能需要 2 分钟(每个文件为 50 MB),但我不明白为什么数据流不分配更多的工作人员来并行读取多个文件。cpu 保持在 ~2-3%,因为大部分时间都花在了文件 IO 上,并且 worker 的数量不超过 2(尽管没有设置限制)。
输出ReadMatches
是 1000 条文件记录,那么为什么数据流不创建大量FileToRowsFn
实例并将它们分派给新工作人员,每个工作人员处理一个文件?
有没有办法强制执行这种行为?
解决方案
这可能是因为您的所有步骤都被 Dataflow 运行器融合为一个步骤。
对于这样一个融合的捆绑包要并行化,第一步需要是可并行化的。在您的情况下,这是一个不可并行化的全局扩展。
为了使您的管道可并行化,您可以尝试中断融合。这可以通过添加一个Reshuffle转换作为产生许多元素的步骤之一的消费者来完成。
例如,
from apache_beam import Reshuffle
additional_side_inputs = {'key1': 'value1', 'key2': 'value2'} # etc.
p | 'Collect CSV files' >> MatchFiles(input_dir + "*.csv")
| 'Read files' >> ReadMatches()
| 'Reshuffle' >> Reshuffle()
| 'Parse contents' >> beam.ParDo(FileToRowsFn(), additional_side_inputs)
| 'Compute average' >> beam.CombinePerKey(AverageCalculatorFn())
textio.ReadFromText()
如果您使用 Beam 中可用的标准源之一(例如读取数据),则不必这样做。(不幸的是,我们没有 CSV 源,但ReadFromText
支持跳过标题行)。
有关融合优化和防止融合的更多信息,请参见此处。
推荐阅读
- airflow - 气流 dag conf 值
- scala - Scala 3 上下文抽象与重载冲突
- java - 为什么 java.net.useSystemProxies 似乎不起作用?
- asp.net - 如何在 APS.net 网络表单中添加新行项目并提交到数据库
- django - 如何将引导程序应用于我的整个 django 项目?
- javascript - 如何在页面加载时在反应打字稿中设置可见性
- android - 为什么我的 Manifest 声明的广播接收器会在一段时间后停止监听显式广播?
- python - 在 Altair 图表中自定义工具提示
- input - NVTabular Graph:工作流图中的重复输入
- python - 在 Django 中保留 API 视图和模板视图的最佳方法