python - SlidingWindows Python Apache Beam 复制数据
问题描述
问题
每次系统从 pubsub 收到带有滑动窗口的消息时,它都会被复制
编码
| 'Parse dictionary' >> beam.Map(lambda elem: (elem['Serial'], int(elem['Value'])))
| 'window' >> beam.WindowInto(window.SlidingWindows(30, 15),accumulation_mode=AccumulationMode.DISCARDING)
| 'Count' >> beam.CombinePerKey(beam.combiners.MeanCombineFn())
输出
如果我只从 pub/sub 发送一条消息并尝试在滑动窗口完成后打印我所拥有的代码:
class print_row2(beam.DoFn):
def process(self, row=beam.DoFn.ElementParam, window=beam.DoFn.WindowParam,timestamp=beam.DoFn.TimestampParam):
print row, timestamp2str(float(window.start)), timestamp2str(float(window.end)),timestamp2str(float(timestamp))
结果
('77777', 120.0) 2018-11-16 08:21:15.000 2018-11-16 08:21:45.000 2018-11-16 08:21:45.000
('77777', 120.0) 2018-11-16 08:21:30.000 2018-11-16 08:22:00.000 2018-11-16 08:22:00.000
'window' >> beam.WindowInto(window.SlidingWindows(30, 15))
如果我在只收到一次之前打印消息
“图形模式下的过程:
time: ----t+00---t+15---t+30----t+45----t+60------>
: : : : :
w1: |=X===========| : :
w2: |==============| :
...
消息 X 在滑动窗口开始时只发送了一次,它应该只接收一次,但已经接收了两次
我已经尝试使用两个 AccumulationMode 值,也使用 trigger=AftyerWatermark 但我无法解决问题。
有什么问题?
额外的
使用 FixedWindows,这是我的 porpouse 的正确代码:
| 'Window' >> beam.WindowInto(window.FixedWindows(1 * 30))
| 'Speed Average' >> beam.GroupByKey()
| "Calculating average" >> beam.CombineValues(beam.combiners.MeanCombineFn())
或者
| 'Window' >> beam.WindowInto(window.FixedWindows(1 * 30))
| "Calculating average" >> beam.CombinePerKey(beam.combiners.MeanCombineFn())
解决方案
属于窗口的所有元素都被发射。如果一个元素属于多个窗口,它将在每个窗口中发出。
仅当您计划处理延迟数据/多次触发触发时,累积模式才重要。在这种情况下,当触发器再次触发时,丢弃模式只为您提供窗口中的新元素,即仅发出自上次触发触发以来到达同一窗口的元素,已经发出的元素不会再次发出并被丢弃。在累积模式下,每次触发触发都会发射整个窗口,它将包括上次发射的旧元素和此后到达的新元素。
如果我理解您的示例,您有滑动窗口,它们的长度为 30 秒,并且每 15 秒启动一次。所以它们重叠了 15 秒:
time: ----t+00---t+15---t+30----t+45----t+60------>
: : : : :
w1: |=============| : :
w2: |==============| :
w3: |===============|
...
因此,您案例中的任何元素都将属于至少两个窗口(第一个和最后一个窗口除外)。
例如,在您的示例中,如果您的消息是在 17:07:15 和 17:07:30 之间发送的,它将出现在两个窗口中。
固定窗口不重叠,因此元素只能属于一个窗口:
time: ----t+00---t+15---t+30----t+45----t+60------>
: : :
w1: |=============| :
w2: |===============|
w3: |====...
...
更多关于 Windows 的信息:https ://beam.apache.org/documentation/programming-guide/#windowing
推荐阅读
- r - 处理 R 中的特殊字体
- node.js - 为什么 MySQL 数据库结果没有返回?
- sql - dbms 输出不显示数据输出?
- javascript - rollup.js:如何执行给定的模块,并在构建时返回执行结果?
- django - 尝试执行“诗歌添加”时,我没有得到模块名称 cleo
- javascript - 尝试在 Discord.js 中发送 Axios 响应时无法发送空消息错误
- python - Python Pandas 根据多列条件替换值
- django - manage.py runserver 比我的 gunicorn/nginx conf 具有更好的性能是否正常?
- python - 检查一个数据框中的值是否存在于另一个数据框中并创建列
- html - CSS位置绝对停止旋转属性