首页 > 解决方案 > 使用 Faust 的滑动窗口

问题描述

有谁知道如何使用 Faust 实现滑动窗口?

这个想法是计算一个键在 10、30、60 和 300 秒窗口中的出现次数,但我们需要在 1 秒或每次更新的基础上进行计数。

我有一个狡猾的解决方法,这似乎非常低效,我有一个翻滚的 1 秒窗口,到期时间为 300 秒,然后我使用该delta()方法将表中的所有旧值与当前值相加。它似乎可以处理来自 6 个源的消息,每个源以 10 条消息/秒的速度运行,但这大约是我们看到滞后之前的限制。这显然是一种无法扩展的缓慢方法,所以问题是如何在不需要 KSQL 或设置 Spark 集群以及 Kafka 集群的情况下实现这一点。如果可以,我们会尽量保持简单。

更复杂的是,我们非常希望在过去 24 小时、1 周、1 个月和过去 3 个月内拥有相同的统计数据……所有这些都在运行中。但也许我们只是要求太多,而没有为每个输入提供专门的流程。

这是我的狡猾代码:

class AlarmCount(faust.Record, serializer='json'):
  event_id: int
  source_id: int
  counts_10: int
  counts_30: int
  counts_60: int
  counts_300: int

@app.agent(events_topic)
async def new_event(stream):
  async for value in stream:
    # calculate the count statistics
    counts_10=0
    counts_30=0
    counts_60=0
    counts_300=0
    
    event_counts_table[value.global_id] += 1
    
    for i in range(300):
      if(i<=10):
        counts_10+=event_counts_table[value.source_id].delta(i)
      if(i<=30):
        counts_30+=event_counts_table[value.source_id].delta(i)
      if(i<=60):
        counts_60+=event_counts_table[value.source_id].delta(i)
      if(i<=300):
        counts_300+=event_counts_table[value.source_id].delta(i)
    
    await event_counts_topic.send(
      value=EventCount(
        event_id=value.event_id,
        source_id=value.source_id,
        counts_10=counts_10,
        counts_30=counts_30,
        counts_60=counts_60,
        counts_300=counts_300
      )
    )

标签: apache-kafka-streamsfaustktable

解决方案


我想在所有窗口上进行迭代,以将最后一个值与所有其他过去值的平均值/偏差/其他聚合进行比较。

  • 就像是table[key].iter_windows()
  • 而不是循环所有.delta(i)

像你一样,我将实现一个带有时间戳列表的表。如果列表太大,它将是次优的,因为它会changelog很胖。我们应该只流式传输已修改的内容,而不是重复每个事件的所有列表。

因此,我将创建一个包含详细信息的短期列表和一个包含聚合的长期列表。然后,每个事件只会更新短期列表。


推荐阅读