首页 > 解决方案 > 将 Spark JSON 格式的 RDD 值解析为不同的值

问题描述

我正在尝试在 Spark (Scala) 中对具有 N 个值且一个为 JSON 格式的 RDD 执行某种 flatMap。

例如,当我打印 RDD 时,我有类似的东西:

myRDD.collect().foreach(println)

[2020,{'COL_A': 1064.3667, 'col_B': 14534.2}]
[2020,{'COL_A': 1064.3667, 'col_B': 145.2}]
[2020,{'COL_A': 1064.3667, 'col_B': 15576.2}]

我想要这样的东西:

[2020,1064.3667,14534.2]
[2020,1064.3667,145.2]
[2020,1064.3667,15576.2]

我不知道这是否可以用平面图完成......

谢谢!

标签: jsonscalaapache-spark

解决方案


使用内置json4s库解析 json。

导入所需的库

scala> import org.json4s.jackson.JsonMethods._
import org.json4s.jackson.JsonMethods._

scala> import org.json4s._
import org.json4s._
scala> val rdd = spark
.sparkContext
.parallelize(
    Seq(
        (2020,"""{"COL_A": 1064.3667, "col_B": 14534.2}"""),
        (2020,"""{"COL_A": 1064.3667, "col_B": 145.2}"""),
        (2020,"""{"COL_A": 1064.3667, "col_B": 15576.2}""")
       )
)
scala> rdd.collect.foreach(println)
(2020,{"COL_A": 1064.3667, "col_B": 14534.2})
(2020,{"COL_A": 1064.3667, "col_B": 145.2})
(2020,{"COL_A": 1064.3667, "col_B": 15576.2})
scala> :paste
// Entering paste mode (ctrl-D to finish)

val transformedRdd = rdd.map { c =>
      implicit val formats = DefaultFormats
      val values = parse(c._2).extract[Map[String,Double]].values.toList
      (c._1,values.head,values.last)
}

// Exiting paste mode, now interpreting.

scala> transformedRdd.collect.foreach(println)
(2020,1064.3667,14534.2)
(2020,1064.3667,145.2)
(2020,1064.3667,15576.2)

推荐阅读