首页 > 解决方案 > pyspark:如何按年/月/日/小时子目录编写数据帧分区?

问题描述

我有制表符分隔的数据(csv 文件),如下所示:

201911240130 a
201911250132 b
201911250143 c
201911250223 z
201911250224 d
...

我想按年、月、日、小时编写目录组。

hdfs://dest/2019/11/24/01/xxxx.csv
hdfs://dest/2019/11/25/01/xxxx.csv
hdfs://dest/2019/11/25/02/xxxx.csv

如何按 yyyy/mm/dd/hh 写入分区?

标签: apache-sparkpysparkapache-spark-sqlpyspark-sql

解决方案


DataFrameWriter 中已经有partitionBy它可以完全满足您的需要,而且要简单得多。此外,还有从时间戳中提取日期部分的功能。

这是您可以考虑的另一种解决方案。

由于您的 CSV 没有标题,因此您可以在加载时应用自定义标题,这样以后可以轻松操作列:

custom_header = "timestamp\tvalue"
schema = StructType()
col_names = custom_header.split("\t")
for c in col_names:
    schema.add(StructField(c.strip(), StringType()))

df = spark.read.csv("hdfs://sample.csv", header=False, sep="\t", schema=schema)

现在,从列中创建列year, month, day,如下所示:hourtimestamp

df_final = df.withColumn("timestamp", to_timestamp(col("timestamp"), 'yyyyMMddHHmm')) \
           .withColumn("year", date_format(col("timestamp"), "yyyy")) \
           .withColumn("month", date_format(col("timestamp"), "MM")) \
           .withColumn("day", date_format(col("timestamp"), "dd")) \
           .withColumn("hour", date_format(col("timestamp"), "HH")) \
           .drop("timestamp")

df_final.show(truncate=False)

+-----+----+-----+---+----+
|value|year|month|day|hour|
+-----+----+-----+---+----+
|a    |2019|11   |24 |01  |
|b    |2019|11   |25 |01  |
|c    |2019|11   |25 |01  |
|z    |2019|11   |25 |02  |
|d    |2019|11   |25 |02  |
+-----+----+-----+---+----+

partitionBy最后,使用如下方式将 DF 写入目标路径:

df_final.write.partitionBy("year", "month", "day", "hour") \
    .mode("overwrite") \
    .option("header", "false").option("sep", "\t") \
    .csv("hdfs://dest/")

分区将在/dest/文件夹下创建。


推荐阅读