python - 将 PCollection 分配回全局窗口
问题描述
我有一个管道,它采用有界 PCollection,为其分配时间戳并将其“窗口化”到滑动窗口中。分组转换后,我想将生成的 PCollection 分配回全局窗口。我一直无法弄清楚如何做到这一点。请参阅下面的示例光束伪代码:
import apache_beam as beam
with beam.Pipeline() as p:
(
p
| beam.io.ReadFromText()
| beam.ParDo(AddTimestampDoFn())
| beam.WindowInto(beam.window.SlidingWindows(60, 60))
| beam.GroupByKey()
| beam.ParDo(SomethingElse()
| beam.WindowInto(GlobalWindow()) # Here is where I want to bring back to global window
)
关于如何去做的任何想法?
解决方案
使用beam.WindowInto(window.GlobalWindows())
应该工作。例如,通过这个快速测试:
data = [{'message': 'Hi', 'timestamp': time.time()}]
events = (p
| 'Create Events' >> beam.Create(data) \
| 'Add Timestamps' >> beam.Map(lambda x: beam.window.TimestampedValue(x, x['timestamp'])) \
| 'Sliding Windows' >> beam.WindowInto(beam.window.SlidingWindows(60, 60)) \
| 'First window' >> beam.ParDo(DebugPrinterFn()) \
| 'global Window' >> beam.WindowInto(window.GlobalWindows()) \
| 'Second window' >> beam.ParDo(DebugPrinterFn()))
其中DebugPrinterFn
打印窗口信息:
class DebugPrinterFn(beam.DoFn):
"""Just prints the element and window"""
def process(self, element, window=beam.DoFn.WindowParam):
logging.info("Received message %s in window=%s", element['message'], window)
yield element
我得到以下输出:
INFO:root:Received message Hi in window=[1575565500.0, 1575565560.0)
INFO:root:Received message Hi in window=GlobalWindow
使用DirectRunner
2.16.0 SDK 测试。如果它不适合您:
- 你有什么错误吗?
- 您使用的是哪个运行器和 SDK?
完整代码在这里
推荐阅读
- sql - 在尝试更改 SQL 中的表时,我收到一条错误消息,提示在新的更改操作开始之前缺少逗号
- reactjs - 是否可以从其他商店修改商店?
- javascript - 分组/嵌套获取函数
- python-pptx - 您如何从条形图或数据标签中获取左值、右值、底部值和顶部值
- powershell - 将停止和启动服务的时间戳打印到 .txt 文件中
- angular - 如何在 Angular/TypeScript 中使用 flatpickr rangePlugin
- java - 支持文本字段中的 RTL(阿拉伯语)
- jquery - 多个 (8) Ajax 请求未同时获取内容
- java - JavaCard 3 Eclipse 转换器错误,找不到包 org.globalplatform 的导出文件 globalplatform.exp
- c++ - 如何将模板从 C++ 转换为 C