首页 > 解决方案 > Kafka Direct Stream 是否自己创建消费者组(因为它不关心应用程序中给出的 group.id 属性)

问题描述

假设我刚刚推出了一个 Kafka 直接流 + 火花流应用程序。对于第一批,驱动程序中的 Streaming Context 连接到 Kafka 并获取 startOffset 和 endOffset。然后,它启动一个带有这些开始和结束偏移范围的 spark 作业,以便执行程序从 Kafka 中获取记录。我的问题从这里开始。当第二批的时候,流式上下文连接到 Kafka 的开始和结束偏移范围。当没有允许存储最后提交偏移值的消费者组(因为直接流不考虑 group.id)时,Kafka 如何提供这些范围?

标签: apache-sparkapache-kafkaspark-streamingspark-streaming-kafka

解决方案


使用 Kafka Consumer API 时总是有一个 Consumer Group。无论您处理哪种流(Spark Direct Streaming、Spark Structured Streaming、Kafka Consumer 的 Java/Scala API ......)。

因为直接流不考虑 group.id

查看 Spark + Kafka直接流式集成指南(spark-streaming-kafka010 的代码示例),了解如何声明消费者组:

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe

val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> "localhost:9092,anotherhost:9092",
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> "use_a_separate_group_id_for_each_stream",
  "auto.offset.reset" -> "latest",
  "enable.auto.commit" -> (false: java.lang.Boolean)
)

val topics = Array("topicA", "topicB")
val stream = KafkaUtils.createDirectStream[String, String](
  streamingContext,
  PreferConsistent,
  Subscribe[String, String](topics, kafkaParams)
)

stream.map(record => (record.key, record.value))

即使您没有在配置中声明消费者组,仍然会为您创建一个(随机)消费者组。

检查您的日志以查看您的应用程序中使用了哪个 group.id。


推荐阅读