首页 > 解决方案 > 如何在 Apache Beam 中实现滚动窗口(关键级别)?

问题描述

我正在使用带有 python SDK 的 Apache Beam 来构建流数据的数据管道(在我的例子中是 google PubSub)。

考虑这种情况: 在此处输入图像描述 假设事件时间〜处理时间(即没有迟到的事件)

现在对于这个快照,我想计算两组特征:

对于第一种情况(每个用户的累积计数),我编写了以下代码并且它运行良好:

    with beam.Pipeline(options = options) as p:
    data = (
    p | 'read data' >> ReadFromPubSub(subscription=SUB_PATH)
      | 'add timestamp' >> beam.ParDo(AddTimestamp())
      | 'parse data' >> beam.ParDo(ParseEvent())  
    )
    
    moving_sum = (
    data 
        | 'global_window' >> beam.WindowInto(windowfn = beam.window.GlobalWindows(),
                                             trigger=beam.trigger.Repeatedly(beam.trigger.AfterCount(1)),
                                             accumulation_mode=beam.trigger.AccumulationMode.ACCUMULATING)
        | 'extract amount' >> beam.ParDo(CollectAmount())
        | 'sum' >> beam.CombinePerKey(sum)
        | 'convert to dict' >> beam.ParDo(Convert2Dict())
        | 'cumsum_to_pqsl' >> relational_db.Write(source_config=source_config,table_config=table_config2)   
    )
    

对于第二组功能,我无法弄清楚每当为该用户读取新事件时,该逻辑将在过去 60 秒内为每个用户提供事件数。这个用例不同于固定窗口和滑动窗口,这里窗口(60 秒)取决于事件。

对此的任何帮助将不胜感激。

标签: pythonspark-streamingapache-flinkapache-beam

解决方案


内置的 Beam 窗口 fns 都不支持这种窗口。相反,您可以考虑使用state 和 timers。将每个元素存储在状态中,在每个元素进入后设置一个定时器 60 秒,并在定时器触发时读取状态以汇总相关事件。


推荐阅读