首页 > 解决方案 > PySpark:使用选定的列或分区优化从 Delta 读取/加载

问题描述

我正在尝试将 Delta 中的数据加载到 pyspark 数据框中。

path_to_data = 's3://mybucket/daily_data/'
df = spark.read.format("delta").load(path_to_data)

现在基础数据按日期划分为

s3://mybucket/daily_data/
    dt=2020-06-12
    dt=2020-06-13
    ...
    dt=2020-06-22

鉴于以下情况,有没有办法将读取优化为 Dataframe:

  1. 只需要特定的日期范围
  2. 只需要列的子集

目前的方式,我试过是:

df.registerTempTable("my_table")
new_df = spark.sql("select col1,col2 from my_table where dt_col > '2020-06-20' ")
# dt_col is column in dataframe of timestamp dtype.

在上述状态下,Spark 是否需要加载整个数据,根据日期范围过滤数据,然后过滤所需的列?是否可以在 pyspark 读取中进行任何优化,以加载数据,因为它已经分区?

在线的东西:

df = spark.read.format("delta").load(path_to_data,cols_to_read=['col1','col2'])
or 
df = spark.read.format("delta").load(path_to_data,partitions=[...])

标签: pythonapache-sparkpysparkdelta-lake

解决方案


在您的情况下,不需要额外的步骤。Spark 将负责优化。dt由于当您尝试使用分区列dt作为过滤条件查询数据集时,您已经根据列对数据集进行了分区。Spark 仅从源数据集中加载与过滤条件匹配的数据子集,在您的情况下为dt > '2020-06-20'.

Spark 在内部进行基于优化的分区修剪。


推荐阅读