首页 > 解决方案 > 如何使用 ksql 在 Kafka 中的时间窗口上在聚合之上执行聚合

问题描述

我有一堆防火墙数据。我想:

A) 将每个 IP 每小时的字节数相加,然后

B) 计算该小时内所有 IP 的最小和最大总和

我已经能够在 Kafka 中做 A,但是,我不知道如何做 B。我一直在研究文档,感觉自己快要接近了,但我似乎总是只找到解决方案的一部分。

我的 firewall_stream 运行良好。

client.create_stream(
    table_name='firewall_stream',
    columns_type=['src_ip VARCHAR',
                  'dst_ip VARCHAR',
                  'src_port INTEGER',
                  'dst_port INTEGER',
                  'protocol VARCHAR',
                  'action VARCHAR',
                  'timestamp VARCHAR',
                  'bytes BIGINT',
    ],
    topic='firewall',
    value_format='JSON'
)

我创建了物化视图 bytes_sent,滚动窗口为 1 小时,总和(字节)并按 IP 地址分组。这很好用!

client.ksql('''
CREATE TABLE bytes_sent as
  SELECT src_ip, sum(bytes) as bytes_sum
  FROM firewall_stream
  GROUP BY src_ip
  EMIT CHANGES
''')

这就是我卡住的地方。首先,我尝试从 bytes_sent 创建另一个物化视图,该视图做了一个 max(bytes_sum) 组,windowstart但我得到一个错误,你不能在窗口化的物化视图上进行聚合。

所以然后我删除了时间窗口(我想我会在第二个物化视图中重新打开它),但是我的“group by”子句没有任何字段。在 Postgres 中,我可以在没有 group by 的情况下进行 max,它会在整个表格中计算它,但 Kafka 总是需要那个 group by。现在我不确定要使用什么。

似乎无法与文档中的窗口表进行连接(尽管我没有尝试过并且可能会被误解)。

我唯一的另一个猜测是从该物化视图 bytes_sent 创建另一个流并查看更改日志事件,然后以某种方式将它们转换为给定时间窗口内所有 IP 的最大字节数。

任何有关如何解决此问题的反馈将不胜感激!!

标签: apache-kafkaapache-kafka-streamsksqldb

解决方案


简短的回答是你目前不能这样做。

我的快速解决方案基本上是:

  • 创建一个主题作为第一个聚合的输出
  • 在该新主题上创建一个新流,但在 ksql 之外
  • 在第二个上运行新的 ksql 聚合

也就是说,在这种设置中可能会出现很多问题。在这一点上,我们只是为这个特定的用例排除了 ksql,并且可能会直接使用流。


推荐阅读