apache-spark - 读取没有分区列名的分区列
问题描述
我们将存储在 s3 中的数据按以下结构分区:
bucket/directory/table/aaaa/bb/cc/dd/
哪里aaaa
是年,bb
是月,cc
是日,dd
是小时。
如您所见,路径中没有分区键 ( year=aaaa
, month=bb
, day=cc
, hour=dd)
.
结果,当我将表读入 Spark 时,没有year
、或列。month
day
hour
无论如何我可以将表读入 Spark 并包含分区列而不包含:
- 更改 s3 中的路径名
- 在循环中迭代每个分区值并将每个分区一个一个地读取到 Spark 中(这是一个巨大的表,这需要很长时间并且显然不是最佳的)。
解决方案
Spark 无法发现未在路径中编码的分区partition_name=value
,因此您必须创建它们。
在将路径加载bucket/directory/table/aaaa/bb/cc/dd/
到 DataFrame 中后,您可以从使用input_file_name()
.
首先,使用分隔符拆分文件名路径,/
然后从最后 4 个元素创建列:
from pyspark.sql import functions as F
df1 = df.withColumn("date_partitions", F.slice(F.split(F.input_file_name(), "/"), -5, 4)) \
.withColumn("year", F.col("date_partitions").getItem(0)) \
.withColumn("month", F.col("date_partitions").getItem(1)) \
.withColumn("day", F.col("date_partitions").getItem(2)) \
.withColumn("hour", F.col("date_partitions").getItem(3)) \
.drop("data_partitions")
例子:
data = [
(1, 2, "bucket/directory/table/2021/01/10/14/"),
(3, 4, "bucket/directory/table/2021/01/11/18/")
]
df = spark.createDataFrame(data, ["a", "b", "input_file_name"])
给出:
#+---+---+-------------------------------------+----+-----+---+----+
#|a |b |input_file_name |year|month|day|hour|
#+---+---+-------------------------------------+----+-----+---+----+
#|1 |2 |bucket/directory/table/2021/01/10/14/|2021|01 |10 |14 |
#|3 |4 |bucket/directory/table/2021/01/11/18/|2021|01 |11 |18 |
#+---+---+-------------------------------------+----+-----+---+----+
推荐阅读
- android - 将 Android 构建 gradle 版本更新到 3.2.1 导致错误
- c - 是否有任何方法可以更新和重新启动服务器,使其套接字保持“暂停”状态?
- html - 鼠标视差对响应式全视口背景的影响
- google-cloud-platform - 通过内部 IP 地址从 Google Cloud Function 访问 VM 实例
- sdk - branch.io 网络应用归因 Javascript SDK
- javascript - 在 Adobe Captivate 中无法运行的神秘 While 循环,如何修复?
- java - 编程:IF风格
- c# - 会话变量的内存占用是多少?
- cordova - 在Android中打开后应用程序立即崩溃
- swift - 一旦事件日期过去,获取在 TableViewControllers 之间传递的信息 - Swift