google-cloud-dataflow - Apache Beam TextIO.read 然后合并成批处理
问题描述
在使用 TextIO.read 获取PCollection<String>
单个行之后,是否可以使用某种组合转换成批次(例如 25 组)?所以返回类型最终会看起来像:PCollection<String, List<String>>
. 看起来应该可以使用某种 . CombineFn
,但 API 对我来说还是有点神秘。
这里的上下文是我正在读取 CSV 文件(可能非常大),解析 + 处理这些行并将它们转换为 JSON,然后调用 REST API ......但我不想为每个文件打 REST API单独一行,因为 REST API 一次支持多个项目(最多 1000 个,所以不是整个批次)。
解决方案
我想你可以像下面那样做一些简单的批处理(使用有状态的 API)。您要保持的状态BatchingFn
是行的当前缓冲区或self._lines
. 抱歉,我是在python
(不熟悉 Java API)中做到的
from apache_beam.transforms import DoFn
from apache_beam.transforms import ParDo
MY_BATCH_SIZE = 512
class BatchingFn(DoFn):
def __init__(self, batch_size=100):
self._batch_size = batch_size
def start_bundle(self):
# buffer for string of lines
self._lines = []
def process(self, element):
# Input element is a string (representing a CSV line)
self._lines.append(element)
if len(_lines) >= self._batch_size:
self._flush_batch()
def finish_bundle(self):
# takes care of the unflushed buffer before finishing
if self._lines:
self._flush_batch()
def _flush_batch(self):
#### Do your REST API call here with self._lines
# .....
# Clear the buffer.
self._lines = []
# pcoll is your PCollection of lines.
(pcoll | 'Call Rest API with batch data' >> ParDo(BatchingFn(MY_BATCH_SIZE)))
关于使用Data-driven triggers,可以参考Beam/Dataflow 中的 Batch PCollection。