首页 > 解决方案 > dstream 解析 JSON 并保存到 textFile:SparkStreaming

问题描述

我有一个 Kakfa 主题,其中数据以 JSON 格式存储。我编写了一个 spark 流代码,我只想将 Kafka 主题中的值保存到 HDFS 中的文件中。

这就是我的 kafka 主题中的数据的样子:

{"group_city":"\"Washington\"","group_country":"\"us\"","event_name":"\"Outdoor Afro Goes Ziplining\""}

下面是我写的代码。当我打印它时,我得到了解析的 JSON,但是当我尝试只将值保存到文本文件时,我的问题就来了。

val dstream = KafkaUtils.createDirectStream[String, String](ssc,preferredHosts,ConsumerStrategies.Subscribe[String, String](topics, kafkaParams))

//___PRINTING RECORDS________
val output= dstream.foreachRDD { rdd =>
  rdd.foreach { record =>
    val values = record.value()
    val tweet = scala.util.parsing.json.JSON.parseFull(values)
    val map:Map[String,String] = tweet.get.asInstanceOf[Map[String, String]]
    map.foreach(p => println(p._2))
  }
}

标签: apache-sparkapache-kafkaspark-streaming

解决方案


您可以使用 rdd 保存 rdd saveAsTextFile,但是由于您只想保存可以转换为数据框并写为csv

dstream.foreachRDD(rawRDD => {

  // get the data 
  val rdd = rawRDD.map(_._2)

  rdd.saveAsTextFile("file path")

  //      or read the json String to dataframe and write as a csv

  spark.read.json(rdd).write.mode(SaveMode.Append).csv("path for output")
})

希望这可以帮助!


推荐阅读