首页 > 解决方案 > 如何将数据帧作为 json 数组写入文件?(斯卡拉)

问题描述

我有一个数据框,我想将它作为 json 数组写入 scala 中的单个文件中。

尝试1:

dataframe.coalesce(1).write.format("json").save(destDir)

输出1: 每行一行,其中每一行是一个json

尝试2:

dataframe.toJSON.coalesce(1).write.format("json").save(destDir)

输出 2: 与输出 1 相同,但每行都有一个奇怪的 json {value: {key1:value1, key2:value2, ... }

尝试 3(使用 java PrintWriter 以字符串形式写入):

printWriter.write(dataframe.toJSON.collect.mkString("[",",","]"))

输出3:

它将一个 json 数组写入本地路径。如果路径是用于 hdfs 的,它会显示 FileNotFound,即使路径 + 文件存在。

标签: jsonscalaapache-sparkdataframe

解决方案


要将数据帧写为 json 数组,首先将数据帧转换为 json 字符串,然后转换这些字符串,以便每一行都是未来 json 文件中的一行,然后使用text而不是json

分析

要将数据帧写入 json,您可以从.toJSON尝试 2 和 3 中的方法开始:

val rawJson = dataframe.toJSON

现在您有一个数据框,其中一列value包含行的 json 表示形式为String.

要将此数据框转换为每行代表未来文件的一行的数据框,您需要:

  • 添加一个包含[作为数据框第一行的新行
  • 将逗号添加到代表您的 json 数据的所有行
  • 除了带有 json 数据的最后一行
  • 添加一个包含]作为数据框最后一行的新行

如您所见,“第一个”和“最后一个”之类的概念在您的情况下很重要,因此您需要在数据框中构建行的顺序。你可以这样关联它:

+-------+--------------------+------------+
| order | row                | value      |
+-------+--------------------+------------+
| 0     | first row          | "["        |
| 1     | row with json      | "  {...}," | 
| 1     | row with json      | "  {...}," | 
| ...   | ...                | ...        |
| 1     | row with json      | "  {...}," |
| 1     | row with json      | "  {...}," |
| 2     | last row with json | "  {...}"  |
| 3     | last row           | "]"        |
+-------+--------------------+------------+

首先,您可以将带有 json 的最后一行与其他行区分开来。为此,您可以使用窗口函数。您计算包含当前行和下一行的窗口中的行数,这意味着您将每一行与 2 相关联,除了最后一行没有下一行,因此您与 1 相关联。

val window = Window.rowsBetween(Window.currentRow, 1)

val jsonWindow = rawJson.withColumn("order", count("value").over(window))

但是,您希望最后一行在“order”列中有 2,而其他行在“order”列中有 1。您可以使用 modulo ( %) 函数来实现:

val jsonRowsWithOrder = jsonWindow.withColumn("order", (col("order") % lit(2)) + 1)

然后将逗号添加到除最后一行之外的所有行,这意味着您将逗号添加到列“order”设置为 1 的所有行:

val jsonRowsWithCommas = jsonRowsWithOrder.withColumn("value", when(col("order").equalTo(1), concat(col("value"), lit(","))).otherwise(col("value")))

最终文件中的那些行将被缩进,所以你缩进它们:

val indentedJsonRows = jsonRowsWithCommas.withColumn("value", concat(lit("  "), col("value")))

您添加第一行和最后一行,其中包含左方括号和右方括号:

val unorderedRows = indentedJsonRows.unionByName(Seq((0, "["), (3, "]")).toDF("order", "value"))

您订购它:

val orderedRows = unorderedRows.orderBy("order").drop("order")

您合并后只有一个分区,因为最后只需要一个文件:

partitionedRows = orderedRows.coalesce(1)

你把它写成文本

partitionedRows.write.text(destDir)

你就完成了!

完整的解决方案

这是完整的解决方案,带有导入。此解决方案适用于 spark 2.3(使用 spark 3.0 测试):

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

val window = Window.rowsBetween(Window.currentRow, 1)

dataframe.toJSON
  .map(jsonString => s"  $jsonString")
  .withColumn("order", (count("value").over(window) % lit(2)) + lit(1))
  .withColumn("value", when(col("order").equalTo(1), concat(col("value"), lit(","))).otherwise(col("value")))
  .unionByName(Seq((0, "["), (3, "]")).toDF("order", "value"))
  .orderBy("order")
  .select("value")
  .coalesce(1)
  .write.text(destDir)

结论

您可以仅使用 spark 将 spark 数据帧编写为 json 数组。

然而,spark 是一个并行计算框架,因此执行一个命令并缩小到一个分区并不是它应该工作的方式。此外,由于您无法更改 spark 输出的文件名称,因此保存的文件将具有.txt扩展名(但里面是一个 json 数组)

最好保存您的数据框,.write.json(destDir)然后使用经典工具重新处理输出,而不是创建复杂的逻辑来使用 spark。


推荐阅读