首页 > 解决方案 > kafka增量聚合

问题描述

delta在 kafka 主题中有一个数字流,需要以特殊方式聚合,即:

aggregate[0] = 0
aggregate[N] = aggregate[N-1] * (N - 1) / N + delta[N - 1] / N

(确切的公式无关紧要,但请注意对前一个元素的依赖aggregate

本质上,我需要同时订阅两个kafka主题,我同时在两个主题中前进:当我阅读delta主题中的一个项目时,我还需要从主题中读取相应的项目aggregate,并写入结果进入aggregate主题,在主题中的下一个项目delta被消费之前。

这在kafka中是否有可能?可以通过巧妙的连接帮助 ksql 吗?

标签: apache-kafkaksqldb

解决方案


我想知道我的伪代码是否有帮助。假设有两个主题,“delta”和“aggregate”。并且两个主题的分区都是 1 以简化情况(以便我们获得全局顺序)

# this is just pseudocode to show my thoughts
def demo():
    delta_consumer = Consumer("delta")
    aggregate_consumer = Consumer("aggregate")
    aggregate_producer = Producer("aggregate")

    is_pre_aggregate_result_exists = aggregate_consumer.get_offset() != 0 # check whether it's first running 
    for delta_data in delta_consumer.poll():
        if not is_pre_aggregate_result_exists:
            last_aggregate_result = 0
        else:
            last_aggregate_result = aggregate_consumer.get_last_record()
        new_aggregate_result = user_define_function(delta_data, last_aggregate_result)
        aggregate_producer.producer(new_aggregate_result)
        is_pre_aggregate_result_exists = True

同时,我想 kafka+structurd-steaming 可以解决您的问题,因为您的问题的内部需要是在流表上获取聚合结果,然后将结果输出到 kafka 主题,其中 kafka+structured-steraming 是一个完美的解决方案。


推荐阅读