首页 > 解决方案 > KafkaUtils.createRDD的简单Spark Structured Streaming等价物,即通过指定偏移量将kafka主题读取到RDD?

问题描述

如何通过指定开始和结束偏移量将 kafka 主题中的数据读取到 RDD?

KafkaUtils.createRDD is是实验性的,API 相当不愉快(它返回一个臃肿的 JavaConsumerRecord类,它甚至不能序列化并将其放入KafkaRDD,它覆盖了很多方法(如 persist)只是抛出一个异常。

我想要的是这样一个简单的 API:

case class Message(key: String, 
                   value: String, 
                   offset: Long, 
                   timestamp: Long)

def readKafka(topic: String, offsetsByPartition: Map[Int, (Long, Long)])
             (config: KafkaConfig, sc: SparkContext): RDD[Message]

或类似的地方key: Array[Byte]value: Array[Byte]

标签: scalaapache-sparkapache-kafkaspark-structured-streaming

解决方案


要从带有偏移量的 kafka 中读取,代码如下所示,如此处所引用

val df = 
  spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1,topic2")
  .option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""")
  .option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""")
  .load()

以上将读取偏移量内可用的数据,然后您可以将列转换为字符串,然后转换为您的对象Message

val messageRDD: RDD[Message] = 
  df.select(
    col("key").cast("string"), 
    col("value").cast("string"), 
    col("offset").cast("long"),
    col("timestamp").cast("long")
  ).as[Message].rdd

推荐阅读