首页 > 解决方案 > 有没有办法使用名称不同于 part* 的 scala 在 spark 3.0.1 中导出 csv 或其他文件?

问题描述

我使用 scala 在 spark 的二维上创建了一个立方体。数据来自两个不同的数据帧。名称是“borrowersTable”和“loansTable”。它们是使用“createOrReplaceTempView”选项创建的,因此可以对它们运行 sql 查询。目标是在两个维度(性别和部门)上创建多维数据集图书馆借书的总数。使用命令

val cube=spark.sql("""
    select 
    borrowersTable.department,borrowersTable.gender,count(loansTable.bibno)
    from borrowersTable,loansTable
    where borrowersTable.bid=loansTable.bid
    group by borrowersTable.gender,borrowersTable.department with cube;
""")

我创建了具有以下结果的多维数据集:

立方体图像

然后使用命令

cube.write.format("csv").save("file:///....../data/cube")

Spark 创建了一个名为 cube 的文件夹,其中包含 34 个名为 part*.csv 的文件,其中包括部门、性别和贷款总和(每个分组依据)的列。

这里的目标是以这种方式创建采用前两列(属性)名称的文件:对于 GroupBy (Attr1, Attr2),文件应命名为 Attr1_Attr2。

例如,对于 (Economics, M),文件应命名为 Economics_M。对于 (Mathematics, null) 它应该是Mathematics_null 等等。任何帮助,将不胜感激。

标签: scalaapache-sparkexport-to-csv

解决方案


当您调用df.write.format("...").save("...")每个 Spark 执行程序时,它会将其保存的分区保存到相应的 part* 文件中。这是存储和加载大文件的机制,您无法更改它。但是,您可以尝试以下在您的情况下效果更好的替代方案:

  1. 分区:
cube
  .write
  .partitionBy("department", "gender")
  .format("csv")
  .save("file:///....../data/cube")

这将创建子文件夹,其名称类似于department=Physics/gender=M其中仍包含 part* 文件。此结构稍后可以加载回 Spark 并用于分区列的有效连接。

  1. 收集
val csvRows = cube
  .collect()
  .foreach {
    case Row(department: String, gender: String, _) => 
      // just the simple way to write CSV, you can use any CSV lib here as well
      Files.write(Paths.get(s"$department_$gender.csv"), s"$department,$gender".getBytes(StandardCharsets.UTF_8))
  }

如果您打电话给collect()您,您会在驱动程序端收到您的数据帧Array[Row],然后您可以随心所欲地使用它。这种方法的重要限制是您的数据帧应该适合驱动程序的内存。


推荐阅读