apache-kafka - 流式查询使用多少个 Kafka 消费者来执行?
问题描述
我很惊讶地看到 Spark 只有一个 Kafka 消费者来使用来自 Kafka 的数据,而这个消费者在驱动程序容器中运行。我更希望看到,Spark 创建的消费者数量与主题中的分区数量一样多,并在执行器容器中运行这些消费者。
例如,我有一个包含 5 个分区的主题事件。我启动了我的 Spark Structured Streaming 应用程序,该应用程序使用该主题并写入 HDFS 上的 Parquet。该应用程序有 5 个执行者。在检查 Spark 创建的 Kafka 消费者组时,我发现只有一个消费者负责所有 5 个分区。这个消费者正在使用驱动程序的机器上运行:
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group spark-kafka-source-08e10acf-7234-425c-a78b-3552694f22ef--1589131535-driver-0
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
events 2 - 0 - consumer-1-8c3d806d-eb1e-4536-97d5-7c9d19582942 /192.168.100.147 consumer-1
events 1 - 0 - consumer-1-8c3d806d-eb1e-4536-97d5-7c9d19582942 /192.168.100.147 consumer-1
events 0 - 0 - consumer-1-8c3d806d-eb1e-4536-97d5-7c9d19582942 /192.168.100.147 consumer-1
events 4 - 0 - consumer-1-8c3d806d-eb1e-4536-97d5-7c9d19582942 /192.168.100.147 consumer-1
events 3 - 0 - consumer-1-8c3d806d-eb1e-4536-97d5-7c9d19582942 /192.168.100.147 consumer-1
在检查了所有 5 个 executor 的日志后,我发现其中只有一个忙于将消费数据写入 HDFS 上的 Parquet 位置。其他 4 人闲置。
这很奇怪。我的期望是 5 个执行器应该并行使用来自 5 个 Kafka 分区的数据并在 HDFS 上并行写入。这是否意味着驱动程序使用来自 Kafka 的数据并将其分发给执行程序?它看起来像一个瓶颈。
更新 1我尝试将repartition(5)添加到流数据帧:
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "brokerhost:9092")
.option("subscribe", "events")
.option("startingOffsets", "earliest")
.load()
.repartition(5)
之后,我看到所有 5 个执行程序都将数据写入 HDFS(根据他们的日志)。尽管如此,我在 Kafka 主题的所有 5 个分区上只看到一个消费者(驱动程序)。
更新 2 Spark 版本 2.4.0。以下是提交申请的命令:
spark-submit \
--name "Streaming Spark App" \
--master yarn \
--deploy-mode cluster \
--conf spark.yarn.maxAppAttempts=1 \
--conf spark.executor.instances=5 \
--conf spark.sql.shuffle.partitions=5 \
--class example.ConsumerMain \
"$jar_file"
解决方案
根据 Structured streaming 的文档,我可以看到它被提及为在执行程序Consumer Caching上创建的消费者。
推荐阅读
- javascript - 反应导航获取上一个屏幕的标题
- c# - 如何将带有文件的对象发布到后端?415 错误。c#, 角
- sql-server - 如何从视图中识别值属于哪个表?
- awk - awk 删除文件中带有字符的行
- c# - 使用 C# 的浏览器自动化 - 读写
- reactjs - 使用 react-intl 格式化瑞士德语
- ruby - 如何包含“auth_request”模块 nginx?
- python - 无法在 Seaborn stripplot 中使用“x”和“+”标记
- amazon-web-services - 使用 AWS Codepipeline 创建和 API 网关
- c++ - 生成指针元组