apache-kafka - Kafka 通过多个分区和多个消费者线程提高吞吐量
问题描述
我正在将 kafka 流用于某些应用程序。
流流如下
kafkaProducer---->StreamerConsumer1->finalCosumer
我的生产者写入数据非常快,我的 StreamConsumer 会将每个流映射到某个进程并将流转发到其他主题。
在我的 StreamCosumer 地图中,我添加了自己的映射器函数,该函数实际上试图保留其相关数据,如下所示
public void checkRecord(T1 key, T2 value) {
switch(T1.toString()){
case "key1":
//Get relavant fileds from value and perisit in db
break;
case "key2":
//Get relavant fileds from value and perisit in db
break;
}
}
KStream<String, KafkaStatusRecordWrapper> pDStream[] = myStream.map(this::checkRecord).branch((key, value)-> value.isSuccess(),(key, value)-> !value.isSuccess());
pDStream[0].mapValues(value -> transformer(value)).to("other_topic",Produced.with(stringSerde, stringSerde));
现在我的 checkRecord 记录消费者函数是单线程的,几乎需要 300 毫秒(由于某些业务逻辑和我无法避免的数据库持久性)才能返回。
我不能增加分区的数量,因为我们的基础设施有一些限制,也由于以下限制
More Partitions Requires More Open File Handles
More Partitions May Increase Unavailability
More Partitions May Increase End-to-end Latency
所以我打算写多线程流消费者。
但我担心以下几点。
- 我只需要处理一次记录
- 移交给另一个线程将导致偏移管理问题。
那么如何提高吞吐量呢?
我的消费者有足够的资源,只使用了 40% 的资源。
解决方案
您可以设置流配置num.stream.threads
来配置线程数。最大值可以是最大分区数。它有助于增加应用程序实例的并行性。
假设您的主题有 4 个分区,您可以设置以下内容:
properties.set("num.stream.threads",4);
推荐阅读
- javascript - 如何在 JavaScript 中连接两个结构相同的 JSON 数据集?
- pine-script - 谁能检查一下为什么我没有收到信号并收到“未声明的标识符”错误
- python - 嵌套循环内的python变量重新分配不起作用
- android - 使用 HC-05 将字符串从 arduino 发送到 android 时接收垃圾数据
- sql - GROUP BY 子句不返回每个组 ID 中的获胜者
- f# - 可以在一行中优化模式匹配代码段吗?
- excel - 运行for循环一次而不是多次删除行
- wordpress - SSL 混合内容,但我在 ftp 上找不到图像
- python - Python ConfigParser .ini 解析和可移植变量替换
- python - 如何将坐标保存到 Elasticsearch 中的索引并在 Kibana 中使用