apache-kafka - 如何使用 ksql 在 Kafka 中的时间窗口上在聚合之上执行聚合
问题描述
我有一堆防火墙数据。我想:
A) 将每个 IP 每小时的字节数相加,然后
B) 计算该小时内所有 IP 的最小和最大总和
我已经能够在 Kafka 中做 A,但是,我不知道如何做 B。我一直在研究文档,感觉自己快要接近了,但我似乎总是只找到解决方案的一部分。
我的 firewall_stream 运行良好。
client.create_stream(
table_name='firewall_stream',
columns_type=['src_ip VARCHAR',
'dst_ip VARCHAR',
'src_port INTEGER',
'dst_port INTEGER',
'protocol VARCHAR',
'action VARCHAR',
'timestamp VARCHAR',
'bytes BIGINT',
],
topic='firewall',
value_format='JSON'
)
我创建了物化视图 bytes_sent,滚动窗口为 1 小时,总和(字节)并按 IP 地址分组。这很好用!
client.ksql('''
CREATE TABLE bytes_sent as
SELECT src_ip, sum(bytes) as bytes_sum
FROM firewall_stream
GROUP BY src_ip
EMIT CHANGES
''')
这就是我卡住的地方。首先,我尝试从 bytes_sent 创建另一个物化视图,该视图做了一个 max(bytes_sum) 组,windowstart
但我得到一个错误,你不能在窗口化的物化视图上进行聚合。
所以然后我删除了时间窗口(我想我会在第二个物化视图中重新打开它),但是我的“group by”子句没有任何字段。在 Postgres 中,我可以在没有 group by 的情况下进行 max,它会在整个表格中计算它,但 Kafka 总是需要那个 group by。现在我不确定要使用什么。
似乎无法与文档中的窗口表进行连接(尽管我没有尝试过并且可能会被误解)。
我唯一的另一个猜测是从该物化视图 bytes_sent 创建另一个流并查看更改日志事件,然后以某种方式将它们转换为给定时间窗口内所有 IP 的最大字节数。
任何有关如何解决此问题的反馈将不胜感激!!
解决方案
简短的回答是你目前不能这样做。
我的快速解决方案基本上是:
- 创建一个主题作为第一个聚合的输出
- 在该新主题上创建一个新流,但在 ksql 之外
- 在第二个上运行新的 ksql 聚合
也就是说,在这种设置中可能会出现很多问题。在这一点上,我们只是为这个特定的用例排除了 ksql,并且可能会直接使用流。
推荐阅读
- android-studio - Android Studio 占用大量内存
- javascript - React - 将道具传递给孩子
- php - 第一个 PHP 表单似乎不起作用
- python - 未能在我的网络抓取工具中执行“显示更多评论”
- python - 如何在 Pandas 中读取 Excel,保持没有 NaN 的混合类型列?
- github - 在合并之前请求用户进行(某种)流程验证
- jenkins - 詹金斯文件 | 多分支管道
- ruby-on-rails - Rails API / react / axios 下载损坏的文件
- sql-server - 页面拆分时的读取一致性
- react-native - 在 React Native 中停止 setInterval