首页 > 解决方案 > Spark PySpark 以一列值作为键输出 JSON

问题描述

如果我在带有架构的数据框中有一个简单的表:

a string, b string, c string

例如:

a     b      c
cat   3-3    78-b
cat   3-3    89-0
cat   4-4    78-n 
dog   4-4    89-b

等等。我想按 a 列对这个表进行分区,并将每个分区保存为单独的 JSON。

此外,我希望每个分区作为一个 JSON 文件,其中 b 列中的值作为键。例如:

File cat.json:
     {
       "3-3": {"b": "3-3", "c": "78-b"},
       "3-3": {"b": "3-3", "c": "89-0"},
       "4-4": {"b": "4-4", "c": "78-n"}
     }
File dog.json:
     {
       "4-4": {"b": 4-4, "c": "89-b"}
     }

有没有办法在 pyspark 中做到这一点?谢谢

标签: apache-sparkpysparkapache-spark-sqlpyspark-dataframes

解决方案


只需为Dataframe添加一个行映射逻辑就可以了,请找到内联的代码说明

import org.apache.spark.sql.Row
import org.apache.spark.sql.functions._

object CatDog {

  def main(args: Array[String]): Unit = {

    val spark = Constant.getSparkSess

    import spark.implicits._

    val df = List(("cat",   "3-3"    ,"78-b"),
    ("cat"   ,"3-3",    "89-0"),
    ("cat"   ,"4-4"    ,"78-n"),
      ("dog"   ,"4-4",    "89-b")).toDF("a","b","c")

//    df.show()

    //Write your out as JSON
    df.select("a").distinct().map((a: Row) =>
      (a, df.filter(col("a") === a).map(row => parseDF(row)))
    ).foreachPartition((iterator) => {
      iterator.foreach(record => {
        val aVal = record._1.getString(0)
        record._2.write.json(s"src/main/resources/$aVal.json")
      })
    })
  }

  //Row mapping logic
  def parseDF(row: Row): Map[String, Map[String, String]] = {

    val b = row.getString(1)
    val c = row.getString(2)
    Map(b -> Map("b" -> b, "c" -> c))

  }

}


推荐阅读