apache-kafka - Kafka 流应用程序无法在单台机器上使用更多 CPU 内核和线程进行扩展
问题描述
我正在编写一个 kafka-streams 应用程序,它基本上从 avro 记录中提取 2 种类型的键并在指定的窗口中对它们进行计数。它应该每秒处理约 6k 个事件。
我遇到的问题:
- 一个
c4.8xlarge
带有线程的实例num.stream.threads = 20
(输入主题的分区数)每秒仅消耗约 2.5k 事件 - 具有线程的相同实例以
num.stream.threads = 10
相同的速率消耗事件 - 四个
c4.2xlarge
实例,num.stream.threads = 5
每秒消耗多达 10-25k 个事件
我从未见过任何核心的 CPU 利用率高于 70%。网络也未充分利用。
这是我的流配置:
kafka.streaming {
compression.type = "lz4"
acks = 1
retries = 1
// I care about throughput more than about latency
max.poll.records = 6000
fetch.min.bytes = 3300000 // 6000 * 550 (average record size)
fetch.max.wait.ms = 1000 // we get 6000 records in 1 second
batch.size = 165000 // (6000 / 20) * 550
linger.ms = 1000
}
经纪人版本:0.10.2.1
卡夫卡流版本:1.1.1
这似乎令人惊讶,因为我认为只要有足够的分区,我就可以线性扩展 kafka 处理,无论消费者位于何处,在单台机器上还是在多台机器上。
许多 EC2 实例可以解决可扩展性问题,但我想在单个实例上运行我的应用程序,因为聚合必须通过交互式查询公开,我不想开发 RPC 层。
UPD:流定义
signalStream
.map[EventDetailsGroup, java.lang.Short]((_, v) => new KeyValue(extractEventDetailsGroup(v), Short.box(1)))
.groupByKey(Serialized.`with`(eventDetailsSerde, Serdes.Short()))
.windowedBy(TimeWindows.of(30 * 60 * 1000).advanceBy(60 * 1000))
.count(Materialized.as("store-name").withCachingDisabled().withLoggingDisabled())
解决方案
推荐阅读
- json - Backstopjs 网站加载问题未通过延迟解决:出了什么问题?
- awk - 循环中的 awk '奇迹般地' 更改 -v 选项中的变量值
- java - 图形未出现在窗口中
- python - Django inlineformset_factory 添加行号
- google-cloud-platform - 在 GCE 上安装 Hashicorp 保险库
- nuget - 上传 NuGet 包,如何设置许可?
- java - Libgdx 桌面不运行 - 警告:发生了非法反射访问操作
- javascript - 隐藏的侧面板菜单的填充使其部分可见
- firebase - 在 Vuex 商店中找不到操作
- python - TypeError:不可散列的类型:sqlalchemy 插入查询的“列表”错误