首页 > 解决方案 > 具有大量磁盘 IO 的 Kafka Streams 作业

问题描述

我遇到了消耗大量磁盘 IO 的 KStreams 作业的问题。iostat这是一个最大 IOPS 为 500 的 Azure VM,每秒处理 400 条记录需要消耗大约 500tps(来自RHEL)。该作业从 Kafka 主题(12 个分区)中提取记录,解构 JSON 字符串并进行一些处理(字符串搜索、替换和数据丰富),然后将其传递到 Kafka 接收器(6 个分区)。也就是说,据我所知,这项工作应该是无状态的,不需要 RocksDB 或其他状态存储。

我有commit.interval.ms=60000(1 分钟),设置PropertiesStreamsConfig.COMMIT_INTERVAL_MS_CONFIG=60000,并且我尝试了其他设置,这似乎不会影响磁盘 IO 或我们看到对 checkpoint.tmp 的写入的频率。我尝试过运行 1 到 12 个线程,使用StreamsConfig.NUM_STREAM_THREADS_CONFIG. 该作业每秒可以处理的 IO 和记录数直接受线程数的影响,线程数越多意味着 IO 和吞吐量越高。我已经关闭了所有日志记录(DEBUG、INFO 等),所以那里应该没有 IO。据我所知,这项工作不断地致力于checkpoint.tmp,而且也imjournal.state.tmp收获了很多。我通过使用echo 1 > /proc/sys/vm/block_dump打开日志来获取此信息,然后dmesg -wH查看发生了什么。下面是我在查看时看到的一个小示例dmesg

有没有人有关于我可以尝试的设置的提示,或者我可以在哪里寻找导致这么多 IO 的原因?

提前致谢。

[ +0.001097] in:imjournal(1190): dirtied inode 50351525 (imjournal.state.tmp) on sda2
[ +0.000002] in:imjournal(1190): dirtied inode 50351525 (imjournal.state.tmp) on sda2
[ +0.005463] java(73280): dirtied inode 17178982 (.checkpoint.tmp) on sda2
[ +0.000003] java(73280): dirtied inode 17178982 (.checkpoint.tmp) on sda2
[ +0.000022] java(73280): WRITE block 16922536 on sda2 (8 sectors)
[ +0.000134] java(73280): WRITE block 33057066 on sda2 (14 sectors)
[ +0.000855] java(73284): dirtied inode 17178985 (.checkpoint.tmp) on sda2
[ +0.000001] java(73281): dirtied inode 51158861 (.checkpoint.tmp) on sda2
[ +0.000002] java(73281): dirtied inode 51158861 (.checkpoint.tmp) on sda2
[ +0.000002] java(73284): dirtied inode 17178985 (.checkpoint.tmp) on sda2
[ +0.000017] java(73284): WRITE block 16922544 on sda2 (8 sectors)
[ +0.000000] java(73281): WRITE block 50389296 on sda2 (8 sectors)
[ +0.000184] java(73282): dirtied inode 892020 (.checkpoint.tmp) on sda2
[ +0.000000] java(73283): dirtied inode 51158863 (.checkpoint.tmp) on sda2
[ +0.000002] java(73283): dirtied inode 51158863 (.checkpoint.tmp) on sda2
[ +0.000002] java(73282): dirtied inode 892020 (.checkpoint.tmp) on sda2
[ +0.000015] java(73282): WRITE block 1140808 on sda2 (8 sectors)
[ +0.000001] java(73283): WRITE block 50389304 on sda2 (8 sectors)
[ +0.000576] in:imjournal(1190): dirtied inode 50351527 (imjournal.state.tmp) on sda2
[ +0.000002] in:imjournal(1190): dirtied inode 50351527 (imjournal.state.tmp) on sda2
[ +0.000671] in:imjournal(1190): dirtied inode 50351525 (imjournal.state.tmp) on sda2
[ +0.000002] in:imjournal(1190): dirtied inode 50351525 (imjournal.state.tmp) on sda2
[ +0.001975] java(73285): dirtied inode 35443635 (.checkpoint.tmp) on sda2
[ +0.000002] java(73285): dirtied inode 35443635 (.checkpoint.tmp) on sda2
[ +0.000015] java(73285): WRITE block 34930256 on sda2 (8 sectors)
[ +0.001344] java(73282): WRITE block 33057080 on sda2 (23 sectors)
[ +0.006529] java(73280): dirtied inode 17178984 (.checkpoint.tmp) on sda2
[ +0.000003] java(73280): dirtied inode 17178984 (.checkpoint.tmp) on sda2
[ +0.000016] java(73280): WRITE block 16922280 on sda2 (8 sectors)
[ +0.002142] java(73280): WRITE block 33057103 on sda2 (7 sectors)
[ +0.001010] in:imjournal(1190): dirtied inode 50351527 (imjournal.state.tmp) on sda2
[ +0.000003] in:imjournal(1190): dirtied inode 50351527 (imjournal.state.tmp) on sda2
[ +0.003449] java(73281): dirtied inode 51158862 (.checkpoint.tmp) on sda2
[ +0.000000] java(73284): dirtied inode 17178983 (.checkpoint.tmp) on sda2
[ +0.000002] java(73284): dirtied inode 17178983 (.checkpoint.tmp) on sda2
[ +0.000003] java(73281): dirtied inode 51158862 (.checkpoint.tmp) on sda2
[ +0.000017] java(73281): WRITE block 50389280 on sda2 (8 sectors)
[ +0.000001] java(73284): WRITE block 16924528 on sda2 (8 sectors)
[ +0.000224] java(73283): dirtied inode 51158864 (.checkpoint.tmp) on sda2
[ +0.000003] java(73283): dirtied inode 51158864 (.checkpoint.tmp) on sda2
[ +0.000001] java(73285): dirtied inode 35443622 (.checkpoint.tmp) on sda2
[ +0.000001] java(73285): dirtied inode 35443622 (.checkpoint.tmp) on sda2
[ +0.000016] java(73283): WRITE block 50389376 on sda2 (8 sectors)
[ +0.000001] java(73285): WRITE block 34930264 on sda2 (8 sectors)
[ +0.000171] java(73282): dirtied inode 892021 (.checkpoint.tmp) on sda2
[ +0.000003] java(73282): dirtied inode 892021 (.checkpoint.tmp) on sda2
[ +0.000018] java(73282): WRITE block 1140824 on sda2 (8 sectors)
[ +0.000331] in:imjournal(1190): dirtied inode 50351525 (imjournal.state.tmp) on sda2
[ +0.000003] in:imjournal(1190): dirtied inode 50351525 (imjournal.state.tmp) on sda2
[ +0.000351] java(73284): WRITE block 33057110 on sda2 (25 sectors)
[ +0.000357] in:imjournal(1190): dirtied inode 50351527 (imjournal.state.tmp) on sda2
[ +0.000002] in:imjournal(1190): dirtied inode 50351527 (imjournal.state.tmp) on sda2
[ +0.004964] java(73280): dirtied inode 17178982 (.checkpoint.tmp) on sda2
[ +0.000002] java(73280): dirtied inode 17178982 (.checkpoint.tmp) on sda2
[ +0.000018] java(73280): WRITE block 16922728 on sda2 (8 sectors)
[ +0.000996] java(73285): WRITE block 33057135 on sda2 (26 sectors)
[ +0.004757] java(73284): dirtied inode 17178985 (.checkpoint.tmp) on sda2
[ +0.000002] java(73284): dirtied inode 17178985 (.checkpoint.tmp) on sda2
[ +0.000016] java(73284): WRITE block 16922720 on sda2 (8 sectors)
[ +0.000310] java(73281): dirtied inode 51158861 (.checkpoint.tmp) on sda2
[ +0.000002] java(73281): dirtied inode 51158861 (.checkpoint.tmp) on sda2
[ +0.000015] java(73281): WRITE block 50389288 on sda2 (8 sectors)
[ +0.001163] in:imjournal(1190): dirtied inode 50351525 (imjournal.state.tmp) on sda2
[ +0.000003] in:imjournal(1190): dirtied inode 50351525 (imjournal.state.tmp) on sda2
[ +0.000082] java(73284): WRITE block 33057161 on sda2 (17 sectors)
[ +0.004419] java(73285): dirtied inode 35443635 (.checkpoint.tmp) on sda2
[ +0.000003] java(73285): dirtied inode 35443635 (.checkpoint.tmp) on sda2
[ +0.000018] java(73285): WRITE block 34930280 on sda2 (8 sectors)
[ +0.000357] java(73282): dirtied inode 892020 (.checkpoint.tmp) on sda2
[ +0.000003] java(73282): dirtied inode 892020 (.checkpoint.tmp) on sda2
[ +0.000017] java(73283): dirtied inode 51158863 (.checkpoint.tmp) on sda2
[ +0.000002] java(73283): dirtied inode 51158863 (.checkpoint.tmp) on sda2
[ +0.000002] java(73282): WRITE block 1140816 on sda2 (8 sectors)
[ +0.000012] java(73283): WRITE block 50389272 on sda2 (8 sectors)
[ +0.000237] in:imjournal(1190): dirtied inode 50351527 (imjournal.state.tmp) on sda2
[ +0.000003] in:imjournal(1190): dirtied inode 50351527 (imjournal.state.tmp) on sda2
[ +0.000530] java(73280): dirtied inode 17178984 (.checkpoint.tmp) on sda2
[ +0.000002] java(73280): dirtied inode 17178984 (.checkpoint.tmp) on sda2
[ +0.000027] java(73280): WRITE block 16922536 on sda2 (8 sectors)
[ +0.000465] java(73285): WRITE block 33057178 on sda2 (38 sectors)
[ +0.000637] in:imjournal(1190): dirtied inode 50351525 (imjournal.state.tmp) on sda2
[ +0.000003] in:imjournal(1190): dirtied inode 50351525 (imjournal.state.tmp) on sda2

标签: apache-kafkakafka-consumer-apiapache-kafka-streams

解决方案


在 Slack 与 Confluent 社区交谈后,我意识到我正在运行 org.apache.kafka -> kafka-streams -> 1.0.2。这个框架的最新版本是 2.3.0,所以我将我的应用程序升级到 2.2.1 并再次尝试。该版本的应用程序几乎没有明显的 tps 或 iowait 运行,并且有 1 个应用程序,产生 12 个线程(线程数与我的分区数相同),我每秒能够处理多达 1,800 条记录。我他们将应用程序升级到 2.3.0 并获得了相同的结果,所以我在这个版本上停留了一段时间。

我不是 100% 确定为什么升级修复了它,但我会假设它与开发团队在较新版本中所做的检查点更改有关。


推荐阅读