首页 > 解决方案 > 将 PCollection 分配回全局窗口

问题描述

我有一个管道,它采用有界 PCollection,为其分配时间戳并将其“窗口化”到滑动窗口中。分组转换后,我想将生成的 PCollection 分配回全局窗口。我一直无法弄清楚如何做到这一点。请参阅下面的示例光束伪代码:

import apache_beam as beam

with beam.Pipeline() as p:
    (
        p
        | beam.io.ReadFromText()
        | beam.ParDo(AddTimestampDoFn())
        | beam.WindowInto(beam.window.SlidingWindows(60, 60))
        | beam.GroupByKey()
        | beam.ParDo(SomethingElse()
        | beam.WindowInto(GlobalWindow()) # Here is where I want to bring back to global window
    )

关于如何去做的任何想法?

标签: pythonapache-flinkgoogle-cloud-dataflowapache-beam

解决方案


使用beam.WindowInto(window.GlobalWindows())应该工作。例如,通过这个快速测试:

data = [{'message': 'Hi', 'timestamp': time.time()}]

events = (p
  | 'Create Events' >> beam.Create(data) \
  | 'Add Timestamps' >> beam.Map(lambda x: beam.window.TimestampedValue(x, x['timestamp'])) \
  | 'Sliding Windows'   >> beam.WindowInto(beam.window.SlidingWindows(60, 60)) \
  | 'First window' >> beam.ParDo(DebugPrinterFn()) \
  | 'global Window'   >> beam.WindowInto(window.GlobalWindows()) \
  | 'Second window'   >> beam.ParDo(DebugPrinterFn()))

其中DebugPrinterFn打印窗口信息:

class DebugPrinterFn(beam.DoFn):
  """Just prints the element and window"""
  def process(self, element, window=beam.DoFn.WindowParam):
    logging.info("Received message %s in window=%s", element['message'], window)
    yield element

我得到以下输出:

INFO:root:Received message Hi in window=[1575565500.0, 1575565560.0)
INFO:root:Received message Hi in window=GlobalWindow

使用DirectRunner2.16.0 SDK 测试。如果它不适合您:

  • 你有什么错误吗?
  • 您使用的是哪个运行器和 SDK?

完整代码在这里


推荐阅读