首页 > 解决方案 > 与 spark 相比,dask 的`read_parquet` 真的很慢

问题描述

我过去有过使用 Spark 的经验,老实说,我主要来自 python 背景,这是一个很大的飞跃。当我看到dask时,我认为这对于分布式计算来说是一个更好的解决方案,但是我似乎遇到了简单地读取 parquet 文件的问题。

我有一个 s3 存储桶,比如说s3://some-bucket/。在其中,我将镶木地板文件保存为 hive 分区,如下所示。

├── year=2019
    └── month=01
    └── month=01
    └── ....     
└── year=2020
    └── month=01
        └── day=01
            ├── datasource=browser
                └── ...
            └── datasource=iphone
                └── part2.parquet
                └── part2.parquet
        └── ...

没有 _metadata 或 _common_metadata 文件

dask为了演示与我相比阅读这些内容的速度有多慢,Spark我可以提供一个示例。在 Spark 中,以下操作大约需要 90 秒:

df = spark.read.parquet("s3://some-bucket/year=2020/month=04/")


df.createOrReplaceTempView("df")

res = spark.sql("SELECT column1, SUM(column2) FROM df GROUP BY column1")

res = res.toPandas()  # convert to pandas to be fair to dask, but we know this isn't fast in spark

dask 中完全相同的事情需要大约 800 秒:

cluster = YarnCluster()

# Connect to the cluster
client = Client(cluster)

cluster.scale(10)

df = dd.read_parquet("s3://some-bucket/year=2020/month=04/*/*/*.parquet",
                     columns=["column1", "column2"], 
                     engine="pyarrow",
                     gather_statistics=False,
)

res = df.groupby('column1').column2.sum().compute()

使用"s3://some-bucket/year=2020/month=04/"而不是"s3://some-bucket/year=2020/month=04/*/*/*.parquet",大约需要 2100 秒。

我已经尝试过的一件事是读取一小块镶木地板的元数据,提取 pyarrow 模式,并将其作为 kwarg 与validate_schema=False. 像这样:

import pyarrow.parquet as pq
import s3fs

s3 = s3fs.S3FileSystem()

# we pick a small part of the dataset
ds = pq.ParquetDataset(
    "s3://some-bucket/year=2020/month=04/day=01/datasource=iphone/part1.parquet",
filesystem=s3)

# read in the table and write it out to a temp parquet
tbl = ds.read()
pq.write_table(tbl, "tmp.parquet")

# read in the metadata using pyarrow and extract the pyarrow._parquet.Schema
pa_metadata = pq.read_metadata("tmp.parquet")
pa_schema = pa_metadata.schema

df_dask = dd.read_parquet("s3://some-bucket/year=2020/month=04/day=01/*/*.parquet",
                     columns=["column1", "column2"], 
                     engine="pyarrow",
                     gather_statistics=False,
                     dataset=dict(validate_schema=False, schema=pa_schema)
)

使用这种方法,只看一天的分区,我看到显着的加速(~4x)。只要我查看一个月的数据,我的工作人员就会被杀死(我假设是因为 dask 试图将太多数据读入一个特定的节点?)。

不幸的是,我无法更改我的数据结构。读完这篇文章后,我意识到如果我有一个_metadataor_common_metadata文件,那么我会看到显着的加速。但是,这对我来说是不可能的

那么,在这个特定的用例中,为什么 dask 比 Spark 慢这么多呢?更具体地说,我能做些什么来加快在 dask 中读取镶木地板文件的速度吗?

额外细节 * 共有超过 1000 列。* 使用 Java 保存数据(我们无法更改) * 包版本 - dask==2.15.0dask-yarn==0.8.1distributed==2.15.2pyarrow==0.17.0 * 每个day=*通常在硬盘上约为 6.4gb。最大的datasource=*分区约为 1.5GB。单个 parquet 文件在 13MB 和 150MB 之间 * 我们尝试fastparquet作为pyarrow引擎的替代方案,但这比pyarrow

标签: pythonapache-sparkpysparkdaskparquet

解决方案


推荐阅读