python - 如何在 Apache Beam 中实现滚动窗口(关键级别)?
问题描述
我正在使用带有 python SDK 的 Apache Beam 来构建流数据的数据管道(在我的例子中是 google PubSub)。
现在对于这个快照,我想计算两组特征:
- 每个用户的累积计数(在每个事件中触发)
- 过去 60 秒内的事件数(在每个事件中触发)
对于第一种情况(每个用户的累积计数),我编写了以下代码并且它运行良好:
with beam.Pipeline(options = options) as p:
data = (
p | 'read data' >> ReadFromPubSub(subscription=SUB_PATH)
| 'add timestamp' >> beam.ParDo(AddTimestamp())
| 'parse data' >> beam.ParDo(ParseEvent())
)
moving_sum = (
data
| 'global_window' >> beam.WindowInto(windowfn = beam.window.GlobalWindows(),
trigger=beam.trigger.Repeatedly(beam.trigger.AfterCount(1)),
accumulation_mode=beam.trigger.AccumulationMode.ACCUMULATING)
| 'extract amount' >> beam.ParDo(CollectAmount())
| 'sum' >> beam.CombinePerKey(sum)
| 'convert to dict' >> beam.ParDo(Convert2Dict())
| 'cumsum_to_pqsl' >> relational_db.Write(source_config=source_config,table_config=table_config2)
)
对于第二组功能,我无法弄清楚每当为该用户读取新事件时,该逻辑将在过去 60 秒内为每个用户提供事件数。这个用例不同于固定窗口和滑动窗口,这里窗口(60 秒)取决于事件。
对此的任何帮助将不胜感激。
解决方案
内置的 Beam 窗口 fns 都不支持这种窗口。相反,您可以考虑使用state 和 timers。将每个元素存储在状态中,在每个元素进入后设置一个定时器 60 秒,并在定时器触发时读取状态以汇总相关事件。
推荐阅读
- python - 我无法使用 pyplot 显示图表
- python - Scipy.optimize 中的多个变量最小化
- reactjs - 如何在 React JS 中制作自定义选择下拉菜单
- sql - 如何解决“未在预期位置找到 FROM 关键字”错误
- mysql - 处理 MYSQL 表中的并行相同插入
- vba - vb中的交叉点
- kubernetes - Helm卸载后如何在预安装挂钩和删除中创建秘密
- flutter - Flutter 的“audioplayers”依赖问题
- python - 领先的 mino Python 错误任何人都可以帮助我
- android - 如何摆脱 .apk 文件中不必要的代码?