首页 > 解决方案 > 我们可以在 spark 结构化流批处理模式下从特定偏移量从 Kafka 获取数据吗

问题描述

在 kafka 中,我动态地获取新主题,我必须使用来自特定偏移量的火花流来处理它。是否有可能从变量传递 json 值。例如考虑下面的代码

val df = spark
 .read
 .format("kafka")
 .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
 .option("subscribePattern", "topic.*")
 .option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""")
 .load()

在这个我想动态更新startingOffsets的值......我试图传递字符串中的值并调用它但它没有工作......如果我在startingOffsets中给出相同的值它正在工作。在这种情况下如何使用变量?

val start_offset= """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}"""
val df = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribePattern", "topic.*")
  .option("startingOffsets", start_offset)
  .load()
java.lang.IllegalArgumentException: Expected e.g. {"topicA":{"0":23,"1":-1},"topicB":{"0":-2}}, got """{"topicA":{"0":23,"1":-1},"topicB":{"0":-2}}"""

标签: spark-structured-streaming

解决方案


def main(args: Array[String]) {
    val conf = new SparkConf().setMaster("local[*]").setAppName("ReadSpecificOffsetFromKafka");
    val spark = SparkSession.builder().config(conf).getOrCreate();
    spark.sparkContext.setLogLevel("error");
    import spark.implicits._;

    val start_offset = """{"first_topic" : {"0" : 15, "1": -2, "2": 6}}"""
    val fromKafka = spark.readStream.format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092, localhost:9093")
      .option("subscribe", "first_topic")
//      .option("startingOffsets", "earliest")
      .option("startingOffsets", start_offset)
      .load();

    val selectedValues = fromKafka.selectExpr("cast(value as string)", "cast(partition as integer)");

    selectedValues.writeStream
      .format("console")
      .outputMode("append")
//      .trigger(Trigger.Continuous("3 seconds"))
      .start()
      .awaitTermination();
  }

这是使用 spark 结构化流和 scala 从 kafka 获取特定偏移量的确切代码


推荐阅读