首页 > 解决方案 > 以元组顺序展平 PCollection

问题描述

我正在尝试使用 Apache 梁中的 Flatten 函数添加标题。但是,似乎没有办法根据文档设置顺序:https ://beam.apache.org/documentation/sdks/pydoc/2.4.0/apache_beam.transforms.core.html?highlight= flatten#apache_beam.transforms.core.Flatten

有时标头位于数据的末尾,而其他的位于顶部。有没有办法设置顺序?想知道我是否缺少某些东西。

with beam.Pipeline(options=options) as p:


  header = [
      ('name', 'number'),
  ]
  phones_list = [
      ('amy', '111-222-3333'),
      ('james', '222-333-4444'),
      ('amy', '333-444-5555'),
      ('carl', '444-555-6666'),
  ]

  header = p | 'Header' >> beam.Create(header)
  phones = p | 'CreatePhones' >> beam.Create(phones_list)  

  merged = ((phones,header)
            | 'MergedPColl' >> beam.Flatten())

  output = merged

  output | 'Write' >> beam.io.WriteToText('./_output')

输出 1:

('amy', '111-222-3333')
('james', '222-333-4444')
('amy', '333-444-5555')
('carl', '444-555-6666')
('name', 'number')

输出 2:

('name', 'number')
('amy', '111-222-3333')
('james', '222-333-4444')
('amy', '333-444-5555')
('carl', '444-555-6666')

标签: pythonapache-beam

解决方案


Flatten是一个适用于 PCollections 的变压器。为了使合并工作并行进行,我认为他们不能保证保留顺序;这与生成的 PCollection 的无序性质一致。

但是,如果您的唯一目的是在顶部添加标题,则可以header使用textio.WriteToText().

> header (str): 写在文件开头作为标题的字符串。如果没有 :data:None并且设置了append_trailing_newlines,则将添加 `\n`。

phones | 'Write' >> beam.io.WriteToText(
  # Feel free to make your own header format.
  './_output', header="('name', 'number')")

更一般地,为了保留原始输入的序列,我会用序列号来增加输入数据。在梁的并行转换(携带每个元素的序列号)之后,您始终可以通过对该序列号进行排序作为后处理步骤(在非并行模式下)来“恢复”原始顺序。


推荐阅读