python - 与 spark 相比,dask 的`read_parquet` 真的很慢
问题描述
我过去有过使用 Spark 的经验,老实说,我主要来自 python 背景,这是一个很大的飞跃。当我看到dask时,我认为这对于分布式计算来说是一个更好的解决方案,但是我似乎遇到了简单地读取 parquet 文件的问题。
我有一个 s3 存储桶,比如说s3://some-bucket/
。在其中,我将镶木地板文件保存为 hive 分区,如下所示。
├── year=2019
└── month=01
└── month=01
└── ....
└── year=2020
└── month=01
└── day=01
├── datasource=browser
└── ...
└── datasource=iphone
└── part2.parquet
└── part2.parquet
└── ...
没有 _metadata 或 _common_metadata 文件。
dask
为了演示与我相比阅读这些内容的速度有多慢,Spark
我可以提供一个示例。在 Spark 中,以下操作大约需要 90 秒:
df = spark.read.parquet("s3://some-bucket/year=2020/month=04/")
df.createOrReplaceTempView("df")
res = spark.sql("SELECT column1, SUM(column2) FROM df GROUP BY column1")
res = res.toPandas() # convert to pandas to be fair to dask, but we know this isn't fast in spark
dask 中完全相同的事情需要大约 800 秒:
cluster = YarnCluster()
# Connect to the cluster
client = Client(cluster)
cluster.scale(10)
df = dd.read_parquet("s3://some-bucket/year=2020/month=04/*/*/*.parquet",
columns=["column1", "column2"],
engine="pyarrow",
gather_statistics=False,
)
res = df.groupby('column1').column2.sum().compute()
使用"s3://some-bucket/year=2020/month=04/"
而不是"s3://some-bucket/year=2020/month=04/*/*/*.parquet"
,大约需要 2100 秒。
我已经尝试过的一件事是读取一小块镶木地板的元数据,提取 pyarrow 模式,并将其作为 kwarg 与validate_schema=False
. 像这样:
import pyarrow.parquet as pq
import s3fs
s3 = s3fs.S3FileSystem()
# we pick a small part of the dataset
ds = pq.ParquetDataset(
"s3://some-bucket/year=2020/month=04/day=01/datasource=iphone/part1.parquet",
filesystem=s3)
# read in the table and write it out to a temp parquet
tbl = ds.read()
pq.write_table(tbl, "tmp.parquet")
# read in the metadata using pyarrow and extract the pyarrow._parquet.Schema
pa_metadata = pq.read_metadata("tmp.parquet")
pa_schema = pa_metadata.schema
df_dask = dd.read_parquet("s3://some-bucket/year=2020/month=04/day=01/*/*.parquet",
columns=["column1", "column2"],
engine="pyarrow",
gather_statistics=False,
dataset=dict(validate_schema=False, schema=pa_schema)
)
使用这种方法,只看一天的分区,我看到显着的加速(~4x)。只要我查看一个月的数据,我的工作人员就会被杀死(我假设是因为 dask 试图将太多数据读入一个特定的节点?)。
不幸的是,我无法更改我的数据结构。读完这篇文章后,我意识到如果我有一个_metadata
or_common_metadata
文件,那么我会看到显着的加速。但是,这对我来说是不可能的
那么,在这个特定的用例中,为什么 dask 比 Spark 慢这么多呢?更具体地说,我能做些什么来加快在 dask 中读取镶木地板文件的速度吗?
额外细节
* 共有超过 1000 列。* 使用 Java 保存数据(我们无法更改) * 包版本 - dask==2.15.0
、dask-yarn==0.8.1
、distributed==2.15.2
、pyarrow==0.17.0
* 每个day=*
通常在硬盘上约为 6.4gb。最大的datasource=*
分区约为 1.5GB。单个 parquet 文件在 13MB 和 150MB 之间 * 我们尝试fastparquet
作为pyarrow
引擎的替代方案,但这比pyarrow
解决方案
推荐阅读
- c# - 如果直到运行时才知道对象的名称,如何引用对象的属性?
- c# - 如何将列表字符串传递给 TempData Asp.Net
- javascript - 计算 typeof 值为 'object' 的属性在另一个对象中的数量
- angular - Angular 为什么我们需要 markForCheck 当 detectChanges 也有效时
- dart - 在 Dart 中禁用 print()
- c# - 如何避免 NGEN?
- ios - -thingWithContentsOfFile:与 NSPropertyListSerialization
- android - 如果背景是 ?android:attr/selectableItemBackground(AlertDialog 中的 ListView),如何使 View 看起来像是被编程选择
- vb.net - 如何在 vb.net 中使用数据填充 XML
- c++ - C++ - 将数字添加到列表中?(如 Python)