首页 > 解决方案 > kafka json数组消费者scala

问题描述

如果我们生成如下 JSON 数据,

[{"name":"Andy", "age":30}]
[{"name":"Romen", "age":20}]

如何使用 scala 在 Kafka 消费者上使用它。我试着把它读成 .select(cast(value as String))。但不起作用。请帮忙

我读取数据如下:

val df = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "broker:host")   
      .option("subscribe", "topic1")
      .option("startingOffsets", "earliest") 
      .load().selectExpr("CAST(value AS STRING)")

标签: jsonscalakafka-consumer-api

解决方案


我只是创建一个示例代码

def main(args: Array[String]): Unit = {

  val spark = SparkSession
    .builder
    .appName("Example")
    .master("local")
    .getOrCreate()

  val ssc = new StreamingContext(spark.sparkContext, Seconds(2))

  val df = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("subscribe", "test")
      .load()

  import spark.implicits._

  val result = df.selectExpr("CAST(value AS STRING)")
      .as[String]

  result.writeStream
      .format("console")
      .start()
      .awaitTermination()

    ssc.start()
    ssc.awaitTermination()

}

如果您想了解更多信息,请从 Kafka 读取消息并将其写入控制台,请在此处查看:https ://spark.apache.org/docs/2.4.5/structured-streaming-programming-guide.html


推荐阅读