apache-spark - 如何知道流式查询用于 Kafka 数据源的 Kafka 消费者组的名称?
问题描述
我通过 spark 结构化流从 kafka 主题中消费数据,该主题有 3 个分区。由于 Spark 结构化流不允许您显式提供 group.id 并为消费者分配一些随机 id,我尝试使用以下 kafka 命令检查消费者组 id
./kafka-consumer-groups.sh --bootstrap-server kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092 --list
output
spark-kafka-source-054e8dac-bea9-46e8-9374-8298daafcd23--1587684247-driver-0
spark-kafka-source-756c08e8-6a84-447c-8326-5af1ef0412f5-209267112-driver-0
spark-kafka-source-9528b191-4322-4334-923d-8c1500ef8194-2006218471-driver-0
以下是我的问题
1)为什么会创建3个消费群体?是因为3个分区吗?
2)有什么办法可以在火花应用程序中获取这些消费者组名称?
3) 即使我的 spark 应用程序仍在运行,但一段时间后,这些组名并没有出现在消费者组列表中。这是因为所有数据都被 spark 应用程序消耗了,并且那个 kafka 主题中没有更多数据了吗?
4)如果我对第 3 点的假设是正确的,如果有新数据到达,它是否会创建一个新的消费者组 ID,或者消费者组的名称将保持不变?
以下是我的阅读流
val inputDf = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokers)
.option("subscribe", topic)
// .option("assign"," {\""+topic+"\":[0]}")
.option("startingOffsets", "earliest")
.option("maxOffsetsPerTrigger", 60000)
.load()
我在应用程序中有 3 个写入流,如下所示
val df = inputDf.selectExpr("CAST(value AS STRING)","CAST(topic AS STRING)","CAST (partition AS INT)","CAST (offset AS INT)","CAST (timestamp AS STRING)")
val df1 = inputDf.selectExpr("CAST (partition AS INT)","CAST (offset AS INT)","CAST (timestamp AS STRING)")
//First stream
val checkpoint_loc1= "/warehouse/test_duplicate/download/chk1"
df1.agg(min("offset"), max("offset"))
.writeStream
.foreach(writer)
.outputMode("complete")
.option("checkpointLocation", checkpoint_loc1).start()
val result = df.select(
df1("result").getItem("_1").as("col1"),
df1("result").getItem("_2").as("col2"),
df1("result").getItem("_5").as("eventdate"))
val distDates = result.select(result("eventdate")).distinct
//Second stream
val checkpoint_loc2= "/warehouse/test_duplicate/download/chk2"
distDates.writeStream.foreach(writer1)
.option("checkpointLocation", checkpoint_loc2).start()
//Third stream
val kafkaOutput =result.writeStream
.outputMode("append")
.format("orc")
.option("path",data_dir)
.option("checkpointLocation", checkpoint_loc3)
.start()
流式查询在代码中只使用一次,并且没有连接。
执行计划
== Parsed Logical Plan ==
StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@109e44ba, kafka, Map(maxOffsetsPerTrigger -> 60000, startingOffsets -> earliest, subscribe -> downloading, kafka.bootstrap.servers -> kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092), [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@593197cb,kafka,List(),None,List(),None,Map(maxOffsetsPerTrigger -> 60000, startingOffsets -> earliest, subscribe -> downloading, kafka.bootstrap.servers -> kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092),None), kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]
== Analyzed Logical Plan ==
key: binary, value: binary, topic: string, partition: int, offset: bigint, timestamp: timestamp, timestampType: int
StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@109e44ba, kafka, Map(maxOffsetsPerTrigger -> 60000, startingOffsets -> earliest, subscribe -> downloading, kafka.bootstrap.servers -> kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092), [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@593197cb,kafka,List(),None,List(),None,Map(maxOffsetsPerTrigger -> 60000, startingOffsets -> earliest, subscribe -> downloading, kafka.bootstrap.servers -> kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092),None), kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]
== Optimized Logical Plan ==
StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@109e44ba, kafka, Map(maxOffsetsPerTrigger -> 60000, startingOffsets -> earliest, subscribe -> downloading, kafka.bootstrap.servers -> kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092), [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@593197cb,kafka,List(),None,List(),None,Map(maxOffsetsPerTrigger -> 60000, startingOffsets -> earliest, subscribe -> downloading, kafka.bootstrap.servers -> kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092),None), kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]
== Physical Plan ==
StreamingRelation kafka, [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13]
解决方案
1)为什么会创建3个消费群体?是因为3个分区吗?
当然不是。这只是一个巧合。您似乎已经运行了 3 次应用程序,并且该主题有 3 个分区。
让我们重新开始备份它。
我删除了所有消费者组以确保我们重新开始。
$ ./bin/kafka-consumer-groups.sh --list --bootstrap-server :9092
spark-kafka-source-cd8c4070-cac0-4653-81bd-4819501769f9-1567209638-driver-0
spark-kafka-source-6a60f735-f05c-49e4-ae88-a6193e7d4bf8--525530617-driver-0
$ ./bin/kafka-consumer-groups.sh --bootstrap-server :9092 --delete --group spark-kafka-source-cd8c4070-cac0-4653-81bd-4819501769f9-1567209638-driver-0
Deletion of requested consumer groups ('spark-kafka-source-cd8c4070-cac0-4653-81bd-4819501769f9-1567209638-driver-0') was successful.
$ ./bin/kafka-consumer-groups.sh --bootstrap-server :9092 --delete --group spark-kafka-source-6a60f735-f05c-49e4-ae88-a6193e7d4bf8--525530617-driver-0
Deletion of requested consumer groups ('spark-kafka-source-6a60f735-f05c-49e4-ae88-a6193e7d4bf8--525530617-driver-0') was successful.
$ ./bin/kafka-consumer-groups.sh --list --bootstrap-server :9092
// nothing got printed out
我创建了一个包含 5 个分区的主题。
$ ./bin/kafka-topics.sh --create --zookeeper :2181 --topic jacek-five-partitions --partitions 5 --replication-factor 1
Created topic "jacek-five-partitions".
$ ./bin/kafka-topics.sh --describe --zookeeper :2181 --topic jacek-five-partitions
Topic:jacek-five-partitions PartitionCount:5 ReplicationFactor:1 Configs:
Topic: jacek-five-partitions Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: jacek-five-partitions Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: jacek-five-partitions Partition: 2 Leader: 0 Replicas: 0 Isr: 0
Topic: jacek-five-partitions Partition: 3 Leader: 0 Replicas: 0 Isr: 0
Topic: jacek-five-partitions Partition: 4 Leader: 0 Replicas: 0 Isr: 0
我使用的代码如下:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.Trigger
object SparkApp extends App {
val spark = SparkSession.builder.master("local[*]").getOrCreate()
import spark.implicits._
val q = spark
.readStream
.format("kafka")
.option("startingoffsets", "latest")
.option("subscribe", "jacek-five-partitions")
.option("kafka.bootstrap.servers", ":9092")
.load
.select($"value" cast "string")
.writeStream
.format("console")
.trigger(Trigger.ProcessingTime("30 seconds"))
.start
q.awaitTermination()
}
当我运行上述 Spark Structured Streaming 应用程序时,我只创建了一个消费者组。
$ ./bin/kafka-consumer-groups.sh --list --bootstrap-server :9092
spark-kafka-source-380da653-c829-45db-859f-09aa9b37784d-338965656-driver-0
这是有道理的,因为所有 Spark 处理都应该使用与分区一样多的 Kafka 消费者,但无论消费者数量如何,都应该只有一个消费者组(否则 Kafka 消费者会消费所有记录并且会有重复) .
2)有什么办法可以在火花应用程序中获取这些消费者组名称?
对此没有公共 API,因此答案是否定的。
但是,您可以“破解” Spark 并在公共 API 之下,直到使用此行的内部 Kafka 消费者:
val uniqueGroupId = s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}"
甚至这条线是准确的:
val kafkaOffsetReader = new KafkaOffsetReader(
strategy(caseInsensitiveParams),
kafkaParamsForDriver(specifiedKafkaParams),
parameters,
driverGroupIdPrefix = s"$uniqueGroupId-driver")
只需找到KafkaMicroBatchReader
Kafka 数据源的 ,请求KafkaOffsetReader
知道的groupId
。这似乎可行。
尽管我的 spark 应用程序仍在运行,但一段时间后,这些组名并没有出现在使用者组列表中。这是因为所有数据都被 spark 应用程序消耗了,并且那个 kafka 主题中没有更多数据了吗?
这可能与KIP-211: Revise Expiration Semantics of Consumer Group Offsets相关,它说:
当达到与该分区关联的过期时间戳时,消费者组中主题分区的偏移量将过期。此过期时间戳通常受代理配置 offsets.retention.minutes 的影响,除非用户覆盖该默认值并使用自定义保留。
4)如果有新数据到达,它会创建新的消费者组ID还是消费者组的名称保持不变?
将保持不变。
此外,当组中至少有一个消费者处于活动状态时,不得删除该消费者组。
推荐阅读
- asp.net-core-mvc - 将 BeginCollectionItem 与 MVC 核心一起使用
- ajax - 来自 Ajax 的 Django 和 POST
- dsl - Xtext,如何正确扩展 DefaultAntlrTokenToAttributeIdMapper
- r - 如何从 R ggplot 中的调色板中选择某些颜色
- javascript - For 循环中的 Promise
- java - Android:Fragment 和 FragmentActivity
- angular - 在 mat-select 上重新选择相同值时触发事件
- javascript - 无法从动态 json 中获取数据
- python-3.x - 替换 xarray 中的零值
- python - Pandas - 根据列值有条件地选择列名