apache-kafka - kafka增量聚合
问题描述
我delta
在 kafka 主题中有一个数字流,需要以特殊方式聚合,即:
aggregate[0] = 0
aggregate[N] = aggregate[N-1] * (N - 1) / N + delta[N - 1] / N
(确切的公式无关紧要,但请注意对前一个元素的依赖aggregate
)
本质上,我需要同时订阅两个kafka主题,我同时在两个主题中前进:当我阅读delta
主题中的一个项目时,我还需要从主题中读取相应的项目aggregate
,并写入结果进入aggregate
主题,在主题中的下一个项目delta
被消费之前。
这在kafka中是否有可能?可以通过巧妙的连接帮助 ksql 吗?
解决方案
我想知道我的伪代码是否有帮助。假设有两个主题,“delta”和“aggregate”。并且两个主题的分区都是 1 以简化情况(以便我们获得全局顺序)
# this is just pseudocode to show my thoughts
def demo():
delta_consumer = Consumer("delta")
aggregate_consumer = Consumer("aggregate")
aggregate_producer = Producer("aggregate")
is_pre_aggregate_result_exists = aggregate_consumer.get_offset() != 0 # check whether it's first running
for delta_data in delta_consumer.poll():
if not is_pre_aggregate_result_exists:
last_aggregate_result = 0
else:
last_aggregate_result = aggregate_consumer.get_last_record()
new_aggregate_result = user_define_function(delta_data, last_aggregate_result)
aggregate_producer.producer(new_aggregate_result)
is_pre_aggregate_result_exists = True
同时,我想 kafka+structurd-steaming 可以解决您的问题,因为您的问题的内部需要是在流表上获取聚合结果,然后将结果输出到 kafka 主题,其中 kafka+structured-steraming 是一个完美的解决方案。
推荐阅读
- youtube - 是否可以使用 YouTube 数据 API 查看某个 YouTube 视频在给定年份的观看次数?
- c++ - 如何打印此代码中小数点后四位的结果?
- docusignapi - DocuSign SOAP - 如何将外部链接添加到信封和电子邮件退回
- assembly - 用另一个寄存器 ARMv8 中的值偏移索引
- css - CSS Flex - 调整 - 等行
- spring-boot - 无法调用“com.fasterxml.jackson.databind.JsonNode.numberValue()”
- c - С - 为什么 if() 语句中的 scanf() 工作不正确?
- python - 使用 pandas 在多列中应用 IF 条件
- r - 从 R 中的离散列创建 4 个新数据帧
- ios - SwiftUI 如何通过扩展 TextField 自动滚动 ScrollView?