apache-spark - dstream 解析 JSON 并保存到 textFile:SparkStreaming
问题描述
我有一个 Kakfa 主题,其中数据以 JSON 格式存储。我编写了一个 spark 流代码,我只想将 Kafka 主题中的值保存到 HDFS 中的文件中。
这就是我的 kafka 主题中的数据的样子:
{"group_city":"\"Washington\"","group_country":"\"us\"","event_name":"\"Outdoor Afro Goes Ziplining\""}
下面是我写的代码。当我打印它时,我得到了解析的 JSON,但是当我尝试只将值保存到文本文件时,我的问题就来了。
val dstream = KafkaUtils.createDirectStream[String, String](ssc,preferredHosts,ConsumerStrategies.Subscribe[String, String](topics, kafkaParams))
//___PRINTING RECORDS________
val output= dstream.foreachRDD { rdd =>
rdd.foreach { record =>
val values = record.value()
val tweet = scala.util.parsing.json.JSON.parseFull(values)
val map:Map[String,String] = tweet.get.asInstanceOf[Map[String, String]]
map.foreach(p => println(p._2))
}
}
解决方案
您可以使用 rdd 保存 rdd saveAsTextFile
,但是由于您只想保存可以转换为数据框并写为csv
dstream.foreachRDD(rawRDD => {
// get the data
val rdd = rawRDD.map(_._2)
rdd.saveAsTextFile("file path")
// or read the json String to dataframe and write as a csv
spark.read.json(rdd).write.mode(SaveMode.Append).csv("path for output")
})
希望这可以帮助!
推荐阅读
- c# - 如何处理 IEqualityComparer 中的空值?
- android - 来自 exoplayer 的 TimelineQueueNavigator 给出了错误的索引
- vba - 将自定义模具用作 Visio 形状的 VBA 填充图案
- jira - 根据状态过滤故事,但包括其所有子任务
- python - 打印到打印机 python turtle 图形图像
- azure-logic-apps - 本地 SQL Server 到 Azure
- magento - Magento 1 上的 SagePay
- mysql - Spark MariaDB jdbc SQL 查询返回列名而不是列值
- angular - 如何循环通过角度获取请求响应
- javascript - 如何通过 JavaScript 从 URL 中获取变量?