apache-spark - Kafka Direct Stream 是否自己创建消费者组(因为它不关心应用程序中给出的 group.id 属性)
问题描述
假设我刚刚推出了一个 Kafka 直接流 + 火花流应用程序。对于第一批,驱动程序中的 Streaming Context 连接到 Kafka 并获取 startOffset 和 endOffset。然后,它启动一个带有这些开始和结束偏移范围的 spark 作业,以便执行程序从 Kafka 中获取记录。我的问题从这里开始。当第二批的时候,流式上下文连接到 Kafka 的开始和结束偏移范围。当没有允许存储最后提交偏移值的消费者组(因为直接流不考虑 group.id)时,Kafka 如何提供这些范围?
解决方案
使用 Kafka Consumer API 时总是有一个 Consumer Group。无论您处理哪种流(Spark Direct Streaming、Spark Structured Streaming、Kafka Consumer 的 Java/Scala API ......)。
因为直接流不考虑 group.id
查看 Spark + Kafka直接流式集成指南(spark-streaming-kafka010 的代码示例),了解如何声明消费者组:
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092,anotherhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "use_a_separate_group_id_for_each_stream",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("topicA", "topicB")
val stream = KafkaUtils.createDirectStream[String, String](
streamingContext,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
stream.map(record => (record.key, record.value))
即使您没有在配置中声明消费者组,仍然会为您创建一个(随机)消费者组。
检查您的日志以查看您的应用程序中使用了哪个 group.id。
推荐阅读
- flutter - Flutter - 当 onDoubleTap on icon 时滚动到顶部
- java - 如何将 rtp 数据包有效负载字节转换为任何音频数据?
- javascript - 如何拆分 JSON 文件的内容,然后显示用户标签而不是 ID?
- linux - 如果不在另一个匹配模式之后,sed 删除匹配模式的行
- node.js - NODEMAILER - 错误:缺少“PLAIN”的凭据
- android - Android Studio 无法打开文件
- android - RxAndroidBle: BleDisconnectedException: 从 MAC='XX:XX:XX:XX:XX:XX' 断开连接,状态为 0 (GATT_SUCCESS)
- css - 我的组件没有读取导入的 Slick CSS
- php - 目标类 [UserController] 不存在
- automatic-ref-counting - JSXGraph 如何用鼠标拖动圆弧