apache-kafka - 用 KStream 语义重组
问题描述
使用 kafka-streams,我想通过某个键S
对元素流进行分组,同时将同一键的所有值聚合到一个连接的结果中。这会产生一个 KTable 。E
K1
AGG
T1
根据聚合结果,该值应重新分区到另一个 KTableT2
中,按K2
从聚合结果中获取的键分组AGG
。所以聚合结果应该为下一次重组生成密钥。
最后,我只对T2
键K2
和值的 KTable 感兴趣AGG
但是,这不起作用。我只得到最后一个值的 KTable T
。不是每个键的值K2
我知道聚合的结果只是在一段时间后才转发,所以我已经尝试降低commit.interval.ms
到 1 但无济于事。
我还尝试使用through
中间结果并将其写入流,但这也没有成功。
val finalTable = streamBuilder.kstream("streamS")
.groupBy{ k, v -> createKey1(k, v) }
.aggregate(
{ Agg.empty() },
{ k, v, previousAgg ->
Agg.merge(previousAgg, v)
})
.toStream()
// .through("table1")
.groupBy { k1, agg -> agg.createKey2()}
.reduce{ _, agg -> agg }
对于S
包含以下值的流:
key1="123", id="1", startNewGroup="false"
key1="234", id="2", startNewGroup="false"
key1="123", id="3", startNewGroup="false"
key1="123", id="4", startNewGroup="true"
key1="234", id="5", startNewGroup="false"
key1="123", id="6", startNewGroup="false"
key1="123", id="7", startNewGroup="false"
key1="123", id="8", startNewGroup="true"
我希望最终结果是具有以下最新键值的 KTable:
key: 123-1, value: 'key1="123", key2="123-1", ids="1,3"'
key: 234-2, value: 'key1="234", key2="234-2", ids="2,5"'
key: 123-4, value: 'key1="123", key2="123-4", ids="4,6,7"'
key: 123-8, value: 'key1="123", key2="123-8", ids="8"'
原始S
元素流首先按key1
聚合结果包含 groupby 键的位置进行分组,并添加一个包含与第一次出现的组合key1
的额外字段。
每当聚合收到一个设置为的值时,它就会返回一个字段设置为和 的聚合,从而有效地创建一个新的子组。
在第二次重组中,我们简单地按字段分组。key2
key1
id
startNewGroup
true
key2
key1
id
key2
然而,我们真正观察到的是以下内容:
key: 234-2, value: 'key1="234", key2="234-2", ids="2,5"'
key: 123-8, value: 'key1="123", key2="123-8", ids="8"'
解决方案
对于您的用例,最好使用Processor API。处理器 API 可以很容易地与 Kafka Streams DSL(处理器 API 集成)结合使用。
您必须实现 Custom Transformer,它将为状态存储中的特定键聚合您的消息。当startNewGroup=true
消息到达时,键的旧消息将被转发到下游并开始新的聚合
您的 Sample Transformer 可能如下所示:
import org.apache.kafka.streams.kstream.Transformer
import org.apache.kafka.streams.processor.ProcessorContext
import org.apache.kafka.streams.state.KeyValueStore
case class CustomTransformer(storeName: String) extends Transformer[String, Value, Agg] {
private var stateStore: KeyValueStore[String, Agg] = null
private var context: ProcessorContext = null
override def init(context: ProcessorContext): Unit = {
this.context = context
stateStore = context.getStateStore(storeName).asInstanceOf[KeyValueStore[String, Agg]]
}
override def transform(key: String, value: Value): Agg = {
val maybeAgg = Option(stateStore.get(key))
if (value.startNewGroup) {
maybeAgg.foreach(context.forward(key, _))
stateStore.put(key, Agg(value))
}
else
stateStore.put(key, maybeAgg.map(_.merge(value)).getOrElse(Agg(value)))
null
}
override def close(): Unit = {}
}
推荐阅读
- excel - 如何在同一模块中调用 vba 函数?
- ios - SQLite db 在真实 iOS 设备中创建错误
- javascript - 如何在 highcarts 的条形图竞赛中为数据标签值添加逗号?
- firebase - 如何修复“错误加载文档”Firestore
- button - 如何在 panel builder 600 中设置按钮密码保护?
- python - 散景。如何匹配多个y轴的比例?
- java - 从用户扫描 10 个整数
- python - 我需要为我的数据集实现肘法以找到 k-means 聚类所需的最佳聚类
- xml - 为什么它在本地工作正常并且服务器获取 groovy.util.slurpersupport.Attributes 无法转换为 java.util.Map 错误?
- vuejs3 - 无法解析组件?