apache-kafka - 从 kafka 流中读取 peek 主题
问题描述
我有一个主题名称,push-processing-KSTREAM-PEEK-0000000014-repartition
这是 kafka 的内部主题。我没有创建这个主题,我.peek()
在重新分区后使用方法并使用 peek 方法 3-4 次。
我的问题是我可以从主题中阅读topic read push-processing-KSTREAM-PEEK-0000000014-repartition
,但是当我说时我无法阅读topic read push-processing-KSTREAM-PEEK-0000000014-repartition --from-beginning
。
这个内部话题是因为peek
方法而创建的吧?
或者它与其他重新分区流代码有关,但它的名字是KSTREEAM-PEEK
?
它有 50 个分区。由于peek
是无状态操作,它不应该正确创建内部主题,但为什么它的名称与之相关peek
,为什么我不能从头开始阅读?
请有任何想法/
这是第一个拓扑:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [appconnect_deviceIds_exported_for_push])
--> KSTREAM-FLATMAP-0000000004
Processor: KSTREAM-FLATMAP-0000000004 (stores: [])
--> KSTREAM-PEEK-0000000005
<-- KSTREAM-SOURCE-0000000000
Processor: KSTREAM-PEEK-0000000005 (stores: [])
--> KSTREAM-FILTER-0000000007
<-- KSTREAM-FLATMAP-0000000004
Processor: KSTREAM-FILTER-0000000007 (stores: [])
--> KSTREAM-SINK-0000000006
<-- KSTREAM-PEEK-0000000005
Sink: KSTREAM-SINK-0000000006 (topic: KSTREAM-PEEK-0000000005-repartition)
<-- KSTREAM-FILTER-0000000007
Sub-topology: 1
Source: KSTREAM-SOURCE-0000000008 (topics: [KSTREAM-PEEK-0000000005-repartition])
--> KSTREAM-JOIN-0000000009
Source: KSTREAM-SOURCE-0000000028 (topics: [KSTREAM-PEEK-0000000025-repartition])
--> KSTREAM-JOIN-0000000029
Processor: KSTREAM-JOIN-0000000009 (stores: [appconnect_device_stream-STATE-STORE-0000000001])
--> KSTREAM-MAP-0000000010
<-- KSTREAM-SOURCE-0000000008
Processor: KSTREAM-JOIN-0000000029 (stores: [appconnect_device_stream-STATE-STORE-0000000001])
--> KSTREAM-PEEK-0000000030
<-- KSTREAM-SOURCE-0000000028
Processor: KSTREAM-MAP-0000000010 (stores: [])
--> KSTREAM-PEEK-0000000011
<-- KSTREAM-JOIN-0000000009
Processor: KSTREAM-PEEK-0000000030 (stores: [])
--> KSTREAM-MAP-0000000031
<-- KSTREAM-JOIN-0000000029
Processor: KSTREAM-MAP-0000000031 (stores: [])
--> KSTREAM-SINK-0000000032
<-- KSTREAM-PEEK-0000000030
Processor: KSTREAM-PEEK-0000000011 (stores: [])
--> KSTREAM-SINK-0000000012
<-- KSTREAM-MAP-0000000010
Source: KSTREAM-SOURCE-0000000002 (topics: [appconnect_device_stream])
--> KTABLE-SOURCE-0000000003
Sink: KSTREAM-SINK-0000000012 (topic: appconnect_devices_exported_for_push)
<-- KSTREAM-PEEK-0000000011
Sink: KSTREAM-SINK-0000000032 (topic: appconnect_devices_exported_for_push)
<-- KSTREAM-MAP-0000000031
Processor: KTABLE-SOURCE-0000000003 (stores: [appconnect_device_stream-STATE-STORE-0000000001])
--> none
<-- KSTREAM-SOURCE-0000000002
Sub-topology: 2
Source: KSTREAM-SOURCE-0000000013 (topics: [appconnect_userIds_exported_for_push])
--> KSTREAM-FLATMAP-0000000017
Processor: KSTREAM-FLATMAP-0000000017 (stores: [])
--> KSTREAM-PEEK-0000000018
<-- KSTREAM-SOURCE-0000000013
Processor: KSTREAM-PEEK-0000000018 (stores: [])
--> KSTREAM-FILTER-0000000020
<-- KSTREAM-FLATMAP-0000000017
Processor: KSTREAM-FILTER-0000000020 (stores: [])
--> KSTREAM-SINK-0000000019
<-- KSTREAM-PEEK-0000000018
Sink: KSTREAM-SINK-0000000019 (topic: KSTREAM-PEEK-0000000018-repartition)
<-- KSTREAM-FILTER-0000000020
Sub-topology: 3
Source: KSTREAM-SOURCE-0000000021 (topics: [KSTREAM-PEEK-0000000018-repartition])
--> KSTREAM-JOIN-0000000022
Processor: KSTREAM-JOIN-0000000022 (stores: [appconnect_user_stream-STATE-STORE-0000000014])
--> KSTREAM-PEEK-0000000023
<-- KSTREAM-SOURCE-0000000021
Processor: KSTREAM-PEEK-0000000023 (stores: [])
--> KSTREAM-MAP-0000000024
<-- KSTREAM-JOIN-0000000022
Processor: KSTREAM-MAP-0000000024 (stores: [])
--> KSTREAM-PEEK-0000000025
<-- KSTREAM-PEEK-0000000023
Processor: KSTREAM-PEEK-0000000025 (stores: [])
--> KSTREAM-FILTER-0000000027
<-- KSTREAM-MAP-0000000024
Processor: KSTREAM-FILTER-0000000027 (stores: [])
--> KSTREAM-SINK-0000000026
<-- KSTREAM-PEEK-0000000025
Source: KSTREAM-SOURCE-0000000015 (topics: [appconnect_user_stream])
--> KTABLE-SOURCE-0000000016
Sink: KSTREAM-SINK-0000000026 (topic: KSTREAM-PEEK-0000000025-repartition)
<-- KSTREAM-FILTER-0000000027
Processor: KTABLE-SOURCE-0000000016 (stores: [appconnect_user_stream-STATE-STORE-0000000014])
--> none
<-- KSTREAM-SOURCE-0000000015
也就是第二步,
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000017 (topics: [KSTREAM-PEEK-0000000014-repartition])
--> KSTREAM-JOIN-0000000018
Processor: KSTREAM-JOIN-0000000018 (stores: [appconnect_push_processing_submissions-STATE-STORE-0000000000])
--> KSTREAM-FILTER-0000000019
<-- KSTREAM-SOURCE-0000000017
Processor: KSTREAM-FILTER-0000000019 (stores: [])
--> KSTREAM-SINK-0000000020
<-- KSTREAM-JOIN-0000000018
Source: KSTREAM-SOURCE-0000000001 (topics: [appconnect_push_processing_submissions])
--> KTABLE-SOURCE-0000000002
Sink: KSTREAM-SINK-0000000020 (topic: appconnect_push_send_bulk)
<-- KSTREAM-FILTER-0000000019
Processor: KTABLE-SOURCE-0000000002 (stores: [appconnect_push_processing_submissions-STATE-STORE-0000000000])
--> none
<-- KSTREAM-SOURCE-0000000001
Sub-topology: 1
Source: KSTREAM-SOURCE-0000000003 (topics: [appconnect_devices_exported_for_push])
--> KSTREAM-MAP-0000000007
Processor: KSTREAM-MAP-0000000007 (stores: [])
--> KSTREAM-PEEK-0000000008
<-- KSTREAM-SOURCE-0000000003
Processor: KSTREAM-PEEK-0000000008 (stores: [])
--> KSTREAM-FILTER-0000000010
<-- KSTREAM-MAP-0000000007
Processor: KSTREAM-FILTER-0000000010 (stores: [])
--> KSTREAM-SINK-0000000009
<-- KSTREAM-PEEK-0000000008
Sink: KSTREAM-SINK-0000000009 (topic: KSTREAM-PEEK-0000000008-repartition)
<-- KSTREAM-FILTER-0000000010
Sub-topology: 2
Source: KSTREAM-SOURCE-0000000011 (topics: [KSTREAM-PEEK-0000000008-repartition])
--> KSTREAM-LEFTJOIN-0000000012
Processor: KSTREAM-LEFTJOIN-0000000012 (stores: [appconnect_user_stream-STATE-STORE-0000000004])
--> KSTREAM-KEY-SELECT-0000000013
<-- KSTREAM-SOURCE-0000000011
Processor: KSTREAM-KEY-SELECT-0000000013 (stores: [])
--> KSTREAM-PEEK-0000000014
<-- KSTREAM-LEFTJOIN-0000000012
Processor: KSTREAM-PEEK-0000000014 (stores: [])
--> KSTREAM-FILTER-0000000016
<-- KSTREAM-KEY-SELECT-0000000013
Processor: KSTREAM-FILTER-0000000016 (stores: [])
--> KSTREAM-SINK-0000000015
<-- KSTREAM-PEEK-0000000014
Source: KSTREAM-SOURCE-0000000005 (topics: [appconnect_user_stream])
--> KTABLE-SOURCE-0000000006
Sink: KSTREAM-SINK-0000000015 (topic: KSTREAM-PEEK-0000000014-repartition)
<-- KSTREAM-FILTER-0000000016
Processor: KTABLE-SOURCE-0000000006 (stores: [appconnect_user_stream-STATE-STORE-0000000004])
--> none
<-- KSTREAM-SOURCE-0000000005
我所有的这些操作都使用相同的 KEY。对于所有主题,我有 5 个代理和 50 个分区。我有 10 个并发,我将我的应用程序扩展到 5。但是就像我说的那样,我正在对同一个键进行重新分区和传输 3-4 次数据。这意味着我所有与 flatMap 相关的值,映射操作都转到同一个分区。1 或 2 次我使用不同的密钥,因此消息分发到不同的分区,只有 1-2 次。这会影响我的表现吗?或者我绝对应该分布在不同的分区上以提高我的性能。
因此,基本上 kafka 在仅使用主题之间的分区执行 3-4 次连接或重新分区操作时表现出更好的性能,因为 kafka 将仅从一个分区读取,并且实际上知道在哪里读取并立即读取所有数据,因为磁盘上物理并行的数据(我的意思是 ssd 或 hdd)。或者我的第二种情况;我绝对应该使用更多的分区来并行读取分区之间的数据吗?
而且我还认为使用 peek 会减慢我的进程。
解决方案
peek()
操作无关。查看您发布的程序(部分)的拓扑描述如下:
KStream inputUser = builder.stream().flatMap().peek().filter();
KStream inputDevice = builder.stream().flatMap().peek().filter();
inputUser.join(inputDevice,...)
(如果您也将代码发布在问题中会更容易)。
因为您调用flatMap()
Kafka Streams 假定您更改了密钥,因此调用join()
会触发数据重新分区。重新分区主题名称由上游运营商生成(我不是 100% 确定为什么PEEK
选择而不是FILTER
公平。)
我所有的这些操作都使用相同的 KEY。
对于这种情况,您可能希望使用flatMapValues()
而不是flatMap()
. 对于这种情况,Kafka Streams 知道密钥没有改变,因此它不会创建重新分区主题。
同样,您可能希望使用if 键没有更改来mapValues()
代替,以避免不必要的重新分区。map()
我的问题是我可以阅读主题“主题阅读推送处理-KSTREAM-PEEK-0000000014-repartition”,但当我说“主题阅读推送处理-KSTREAM-PEEK-0000000014-repartition --from-beginning”时我无法阅读"
我不确定你的意思是什么。做什么
当我说“主题读取 push-processing-KSTREAM-PEEK-0000000014-repartition --from-beginning”时
意思是?你指的是命令行工具bin/kafka-consumer.sh
吗?一般来说,是的,您可以阅读重新分区主题,但我不确定为什么这会有用?
推荐阅读
- php - 无法在资产文件夹中加载请求的文件
- sql - 使用嵌套选择的 SQL lambda 输出不正确
- python-3.x - 如何将 Azure-cli 命令的输出保存在变量中
- aframe - a-text 被透明 a-sphere 覆盖时消失
- angular - 将插入符号/光标移动到 textarea/input 中的结束位置——Ionic 3,Angular2+
- asp.net-core - 使用 ASP.NET Core 从 URL 生成路由模板
- php - Laravel - 当我尝试使用多重身份验证时得到错误的保护模型
- windows - 将 PowerShell foreach 循环转换为 /f 循环的批处理文件
- javascript - 如何在多个函数调用中保留相同的随机数?
- javascript - 函数节点 js 的数组别名