首页 > 解决方案 > Apache Beam TextIO.read 然后合并成批处理

问题描述

在使用 TextIO.read 获取PCollection<String>单个行之后,是否可以使用某种组合转换成批次(例如 25 组)?所以返回类型最终会看起来像:PCollection<String, List<String>>. 看起来应该可以使用某种 . CombineFn,但 API 对我来说还是有点神秘。

这里的上下文是我正在读取 CSV 文件(可能非常大),解析 + 处理这些行并将它们转换为 JSON,然后调用 REST API ......但我不想为每个文件打 REST API单独一行,因为 REST API 一次支持多个项目(最多 1000 个,所以不是整个批次)。

标签: google-cloud-dataflowapache-beam

解决方案


我想你可以像下面那样做一些简单的批处理(使用有状态的 API)。您要保持的状态BatchingFn是行的当前缓冲区或self._lines. 抱歉,我是在python(不熟悉 Java API)中做到的

from apache_beam.transforms import DoFn
from apache_beam.transforms import ParDo

MY_BATCH_SIZE = 512

class BatchingFn(DoFn):
  def __init__(self, batch_size=100):
    self._batch_size = batch_size

  def start_bundle(self):
    # buffer for string of lines
    self._lines = []

  def process(self, element):
    # Input element is a string (representing a CSV line)
    self._lines.append(element)
    if len(_lines) >= self._batch_size:
      self._flush_batch()

  def finish_bundle(self):
    # takes care of the unflushed buffer before finishing
    if self._lines:
      self._flush_batch()

  def _flush_batch(self):
    #### Do your REST API call here with self._lines
    # .....
    # Clear the buffer.
    self._lines = []

# pcoll is your PCollection of lines.
(pcoll | 'Call Rest API with batch data' >> ParDo(BatchingFn(MY_BATCH_SIZE)))

关于使用Data-driven triggers,可以参考Beam/Dataflow 中的 Batch PCollection


推荐阅读