json - 如何将数据帧作为 json 数组写入文件?(斯卡拉)
问题描述
我有一个数据框,我想将它作为 json 数组写入 scala 中的单个文件中。
尝试1:
dataframe.coalesce(1).write.format("json").save(destDir)
输出1: 每行一行,其中每一行是一个json
尝试2:
dataframe.toJSON.coalesce(1).write.format("json").save(destDir)
输出 2: 与输出 1 相同,但每行都有一个奇怪的 json {value: {key1:value1, key2:value2, ... }
尝试 3(使用 java PrintWriter 以字符串形式写入):
printWriter.write(dataframe.toJSON.collect.mkString("[",",","]"))
输出3:
它将一个 json 数组写入本地路径。如果路径是用于 hdfs 的,它会显示 FileNotFound,即使路径 + 文件存在。
解决方案
要将数据帧写为 json 数组,首先将数据帧转换为 json 字符串,然后转换这些字符串,以便每一行都是未来 json 文件中的一行,然后使用text
而不是json
分析
要将数据帧写入 json,您可以从.toJSON
尝试 2 和 3 中的方法开始:
val rawJson = dataframe.toJSON
现在您有一个数据框,其中一列value
包含行的 json 表示形式为String
.
要将此数据框转换为每行代表未来文件的一行的数据框,您需要:
- 添加一个包含
[
作为数据框第一行的新行 - 将逗号添加到代表您的 json 数据的所有行
- 除了带有 json 数据的最后一行
- 添加一个包含
]
作为数据框最后一行的新行
如您所见,“第一个”和“最后一个”之类的概念在您的情况下很重要,因此您需要在数据框中构建行的顺序。你可以这样关联它:
+-------+--------------------+------------+
| order | row | value |
+-------+--------------------+------------+
| 0 | first row | "[" |
| 1 | row with json | " {...}," |
| 1 | row with json | " {...}," |
| ... | ... | ... |
| 1 | row with json | " {...}," |
| 1 | row with json | " {...}," |
| 2 | last row with json | " {...}" |
| 3 | last row | "]" |
+-------+--------------------+------------+
首先,您可以将带有 json 的最后一行与其他行区分开来。为此,您可以使用窗口函数。您计算包含当前行和下一行的窗口中的行数,这意味着您将每一行与 2 相关联,除了最后一行没有下一行,因此您与 1 相关联。
val window = Window.rowsBetween(Window.currentRow, 1)
val jsonWindow = rawJson.withColumn("order", count("value").over(window))
但是,您希望最后一行在“order”列中有 2,而其他行在“order”列中有 1。您可以使用 modulo ( %
) 函数来实现:
val jsonRowsWithOrder = jsonWindow.withColumn("order", (col("order") % lit(2)) + 1)
然后将逗号添加到除最后一行之外的所有行,这意味着您将逗号添加到列“order”设置为 1 的所有行:
val jsonRowsWithCommas = jsonRowsWithOrder.withColumn("value", when(col("order").equalTo(1), concat(col("value"), lit(","))).otherwise(col("value")))
最终文件中的那些行将被缩进,所以你缩进它们:
val indentedJsonRows = jsonRowsWithCommas.withColumn("value", concat(lit(" "), col("value")))
您添加第一行和最后一行,其中包含左方括号和右方括号:
val unorderedRows = indentedJsonRows.unionByName(Seq((0, "["), (3, "]")).toDF("order", "value"))
您订购它:
val orderedRows = unorderedRows.orderBy("order").drop("order")
您合并后只有一个分区,因为最后只需要一个文件:
partitionedRows = orderedRows.coalesce(1)
你把它写成文本:
partitionedRows.write.text(destDir)
你就完成了!
完整的解决方案
这是完整的解决方案,带有导入。此解决方案适用于 spark 2.3(使用 spark 3.0 测试):
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
val window = Window.rowsBetween(Window.currentRow, 1)
dataframe.toJSON
.map(jsonString => s" $jsonString")
.withColumn("order", (count("value").over(window) % lit(2)) + lit(1))
.withColumn("value", when(col("order").equalTo(1), concat(col("value"), lit(","))).otherwise(col("value")))
.unionByName(Seq((0, "["), (3, "]")).toDF("order", "value"))
.orderBy("order")
.select("value")
.coalesce(1)
.write.text(destDir)
结论
您可以仅使用 spark 将 spark 数据帧编写为 json 数组。
然而,spark 是一个并行计算框架,因此执行一个命令并缩小到一个分区并不是它应该工作的方式。此外,由于您无法更改 spark 输出的文件名称,因此保存的文件将具有.txt
扩展名(但里面是一个 json 数组)
最好保存您的数据框,.write.json(destDir)
然后使用经典工具重新处理输出,而不是创建复杂的逻辑来使用 spark。
推荐阅读
- python - 最大化正在运行的 Windows 应用程序
- python - 当我在python中正确编写代码时,为什么控制台会给出错误的输出?
- html - 在 TIBCO Jaspersoft Studio 中添加分页符
- pyspark - 在 pyspark 中对大量列进行累积求和的优化方法
- r - R中的Kruskal.test
- python - 带有 where 子句的 Pandas read_sql 使用“in”
- javascript - Javascript - 克隆的元素不响应点击
- git - Git:“系统找不到指定的路径。” git lfs 迁移后
- python - Python 仅使用列表推导动态计算没有重复项的列表
- c# - 将 PowerShell 中的加密/解密函数转换为 PHP (openssl_)