apache-kafka-streams - Kafka Streams Aggregator 中的访问记录偏移量
问题描述
我有一个简单的窗口拓扑:
builder.stream("input-topic", Consumed.with(...))
.groupByKey()
.windowedBy(TimeWindows.of(windowSize).advanceBy(windowAdvance).grace(windowGrace))
.aggregate(Frame::new,
this::windowAggregator,
...
)
.transformValues(FrameTransformer::new)
.toStream()
.selectKey((key, value) -> value...)
.to("output-topic", Produced.with(...));
我想将窗口开头的实际记录偏移量放入 Frame 聚合对象中。
如何从windowAggregator
( aggregate()
handler) 函数访问记录偏移量?
我知道我可以访问 中的记录偏移量FrameTransformer
,但这并不能帮助我创建准确Frame
的对象来描述我的窗口的开始和结束偏移量。
我听说有一种方法可以通过.transform()
在 之前插入另一个调用,在groupByKey()
那里我可以访问偏移量,但是我需要修改我的事件记录的架构以将偏移量信息存储在里面。
有没有(更简单)的方法可以实现我的意图?
更新
事实上,我能够通过Frame
以下方式获得对象中准确的窗口开始和结束偏移量
builder.stream("input-topic", Consumed.with(...))
.transformValues(EventTransformer::new)
.groupByKey()
.windowedBy(TimeWindows.of(windowSize).advanceBy(windowAdvance).grace(windowGrace))
.aggregate(Frame::new,
this::windowAggregator,
...
)
.toStream()
.selectKey((key, value) -> value...)
.to("output-topic", Produced.with(...));
但如上所述,以编辑Event
对象的模式为代价。
解决方案
如何从 windowAggregator(aggregate() 处理程序)函数访问记录偏移量?
你不能。您transformValues()
在聚合之前使用的方法(并丰富Event
对象是正确的方法。
有人提议扩展 API 以允许访问内部aggregate()
和其他 DSL 运营商的记录元数据,但它从未被推过终点线(参见https://cwiki.apache.org/confluence/display/KAFKA/KIP-159% 3A+Introducing+Rich+functions+to+Streams)。
推荐阅读
- c - 运行程序后imac死机
- python - 从网站提取数据时如何绕过验证码。我从 https://jp.indeed.com/ 中提取
- sensors - ESP32 模数 ADC
- html - 将鼠标悬停在我的卡片上并不能覆盖完整的卡片
- java - 如何在 DynamoDB 中使用包含函数和 DynamoDBMapper Java?
- ios - 无法将字符串转换为 NSDate Swift
- sql - sql查询不包括当前日期的数据
- python - Python 的海龟模块 - 我如何让海龟根据它下面的标记颜色做一个动作?
- python - 如何使用 for 循环将不同的变量分配给类对象?
- javascript - 如何在 nodejs 上使用 imap-simple 和 mailparser 按发件人过滤邮件?