首页 > 解决方案 > Spark 1.6:将数据帧存储到 hdfs 中的多个 csv 文件中(按 id 分区)

问题描述

我正在尝试通过 id 将 dataFrame 保存到 csv 分区中,因为我使用的是 spark 1.6 和 scala。函数 partitionBy("id") 没有给我正确的结果。

我的代码在这里:

validDf.write
       .partitionBy("id")
       .format("com.databricks.spark.csv")
       .option("header", "true")
       .option("delimiter", ";")
       .mode("overwrite")       
       .save("path_hdfs_csv")

My Dataframe looks like  :
-----------------------------------------
| ID        |  NAME       |  STATUS     |
-----------------------------------------
|     1     |     N1      |     S1      |
|     2     |     N2      |     S2      |
|     3     |     N3      |     S1      |
|     4     |     N4      |     S3      |
|     5     |     N5      |     S2      |
-----------------------------------------

此代码不基于列 ID 创建 3 个 csv 默认分区(part_0、part_1、part_2)。

我期望的是:为每个 id 获取子目录或分区。有什么帮助吗?

标签: scalaapache-sparkhadoophadoop-streaming

解决方案


spark1.6(或所有低于 2 的 spark 版本)中的 Spark-csv 不支持分区。
您的代码适用于 spark > 2.0.0。

对于您的 spark 版本,您需要先准备 csv 并将其保存为文本(分区适用于spark-text):

import org.apache.spark.sql.functions.{col,concat_ws}
val key = col("ID")
val concat_col = concat_ws(",",df.columns.map(c=>col(c)):_*) // concat cols to one col
val final_df = df.select(col("ID"),concat_col) // dataframe with 2 columns: id and string 
final_df.write.partitionBy("ID").text("path_hdfs_csv") //save to hdfs

推荐阅读