首页 > 解决方案 > Kafka Streams Aggregator 中的访问记录偏移量

问题描述

我有一个简单的窗口拓扑:

builder.stream("input-topic", Consumed.with(...))
    .groupByKey()
    .windowedBy(TimeWindows.of(windowSize).advanceBy(windowAdvance).grace(windowGrace))
    .aggregate(Frame::new,
            this::windowAggregator,
            ...
    )
    .transformValues(FrameTransformer::new)
    .toStream()
    .selectKey((key, value) -> value...)
    .to("output-topic", Produced.with(...));

我想将窗口开头的实际记录偏移量放入 Frame 聚合对象中。

如何从windowAggregator( aggregate()handler) 函数访问记录偏移量?

我知道我可以访问 中的记录偏移量FrameTransformer,但这并不能帮助我创建准确Frame的对象来描述我的窗口的开始和结束偏移量。

我听说有一种方法可以通过.transform()在 之前插入另一个调用,在groupByKey()那里我可以访问偏移量,但是我需要修改我的事件记录的架构以将偏移量信息存储在里面。

有没有(更简单)的方法可以实现我的意图?

更新

事实上,我能够通过Frame以下方式获得对象中准确的窗口开始和结束偏移量

builder.stream("input-topic", Consumed.with(...))
    .transformValues(EventTransformer::new)
    .groupByKey()
    .windowedBy(TimeWindows.of(windowSize).advanceBy(windowAdvance).grace(windowGrace))
    .aggregate(Frame::new,
            this::windowAggregator,
            ...
    )
    .toStream()
    .selectKey((key, value) -> value...)
    .to("output-topic", Produced.with(...));

但如上所述,以编辑Event对象的模式为代价。

标签: apache-kafka-streams

解决方案


如何从 windowAggregator(aggregate() 处理程序)函数访问记录偏移量?

你不能。您transformValues()在聚合之前使用的方法(并丰富Event对象是正确的方法。

有人提议扩展 API 以允许访问内部aggregate()和其他 DSL 运营商的记录元数据,但它从未被推过终点线(参见https://cwiki.apache.org/confluence/display/KAFKA/KIP-159% 3A+Introducing+Rich+functions+to+Streams)。


推荐阅读