apache-kafka - Kafka Streams:不重新分区共同分区数据的地图
问题描述
我有一个来自类型为 [K3, V] 的基础主题的 KStream。K3是由三个字段组成的密钥,即K3(a,b,c)。然而,主题仅由键字段的子集划分,即 K2 (a,b)。
现在,我想创建一个 KTable 来连接并在我的 PAPI 处理器中使用。我希望这个 KTable 按 K2(a,b) 聚合。聚合只是将值收集到一个集合中。
为此,我必须使用“map”函数将我的密钥从 K3 转换为 K2。这将(尝试)通过创建新的重新分区主题来重新分区数据(尽管实际上数据将保留在相同的分区中,因为它还将使用 K2 作为分区键),请参阅下面拓扑中的“test-customerStoreName-repartition”。
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [test-customerz])
--> KSTREAM-MAP-0000000003
Processor: KSTREAM-MAP-0000000003 (stores: [])
--> KSTREAM-FILTER-0000000006
<-- KSTREAM-SOURCE-0000000000
Processor: KSTREAM-FILTER-0000000006 (stores: [])
--> KSTREAM-SINK-0000000005
<-- KSTREAM-MAP-0000000003
Sink: KSTREAM-SINK-0000000005 (topic: test-customerStoreName-repartition)
<-- KSTREAM-FILTER-0000000006
有没有一种方法可以在不必通过地图重新分区的情况下进行这种聚合?
解决方案
使用 DSL,这是不可能的,因为您无法告诉库不需要重新分区。有一个 KIP 建议添加这样的功能:https ://cwiki.apache.org/confluence/display/KAFKA/KIP-759%3A+Unneeded+repartition+canceling
您需要直接使用处理器 API,因为处理器 API 没有任何自动重新分区。
您还可以“破解”某些东西:在map()
返回的KStream
可以转换为KStreamImpl
类型之后,然后通过反射您可以将内部标志设置repartitionRequired
为false
. 但这是一个黑客!
推荐阅读
- git - 如何在不提交/添加任何文件或修改任何文件的情况下检查远程 GIT 存储库权限
- mysql - 我在 SQL 查询中得到错误的结果
- regex - 正则表达式匹配 2-8 组中的十六进制
- git - git 与特定提交合并
- java - Spring Boot 页面反序列化 - PageImpl 无构造函数
- git - 尝试推送到 Azure Devops 时出现“无法解析远程解包状态”错误
- c# - 发布 C# winform 应用程序时出错
- javascript - 基于数组视图中的选定项渲染组件
- html - CSS/HTML - 当自动边距似乎不起作用时,我如何才能居中?
- python - 使用 Networkx - Python 的图形中不显示边