首页 > 解决方案 > 使用 Xarray 和 Dask 更快地重塑 mfdataset 文件

问题描述

我正在尝试将 Dask 与 Xarray 结合使用来加载从ecmwf下载的数据集。该数据存储在netcdf以月为单位沿时间轴拆分的文件中。它们是每个文件中 8 个变量的 3 小时步进样本。

我想在整个时间跨度内将这个数据集重新保存为每个纬度/经度索引的文件。所以从lat=all, lng=all, time=monthslat=1, lng=1, time=all

但是,我不确定如何从中获得更好的性能,它似乎比我目前预期的要慢 - 甚至只是预处理。感觉我必须以最佳方式做一些事情。

这是我使用的步骤和速度:

  1. 正在加载。这相当快。在使用集群的情况下,完整的数据集xarray.open_mfdataset在大约 60 秒内完成。我曾尝试加载chunks=dict(time=-1, latitude=1, longitude=1)以尝试让 Dask 知道这就是我计划访问数据的方式,但是 的块data_vars似乎保持不变并忽略了进入xarray.open_mfdataset.
import xarray as xr
from dask.distributed import Client

c = Client("scheduler:8786")
c.cluster

ds = xr.open_mfdataset(
    "/resource_data/ecmwf/era20c/grid-av/hour-step-3/*.nc",
    parallel=True,
    # chunks=dict(time=-1, latitude=1, longitude=1)
)
print(ds)
<xarray.Dataset>
Dimensions:    (longitude: 240, latitude: 121, time: 321664)
Coordinates:
  * longitude  (longitude) float32 0.0 1.5 3.0 4.5 ... 354.0 355.5 357.0 358.5
  * latitude   (latitude) float32 90.0 88.5 87.0 85.5 ... -87.0 -88.5 -90.0
  * time       (time) datetime64[ns] 1900-01-01 ... 2010-01-31T21:00:00
Data variables:
    mp1        (time, latitude, longitude) float32 dask.array<chunksize=(248, 121, 240), meta=np.ndarray>
    wind       (time, latitude, longitude) float32 dask.array<chunksize=(248, 121, 240), meta=np.ndarray>
    mwd        (time, latitude, longitude) float32 dask.array<chunksize=(248, 121, 240), meta=np.ndarray>
    dwi        (time, latitude, longitude) float32 dask.array<chunksize=(248, 121, 240), meta=np.ndarray>
    swh        (time, latitude, longitude) float32 dask.array<chunksize=(248, 121, 240), meta=np.ndarray>
    pp1d       (time, latitude, longitude) float32 dask.array<chunksize=(248, 121, 240), meta=np.ndarray>
    mp2        (time, latitude, longitude) float32 dask.array<chunksize=(248, 121, 240), meta=np.ndarray>
    mwp        (time, latitude, longitude) float32 dask.array<chunksize=(248, 121, 240), meta=np.ndarray>
Attributes:
    Conventions:  CF-1.6
    history:      2019-11-22 10:06:28 GMT by grib_to_netcdf-2.14.1
  1. 接下来,我尝试从数据集为xarray.save_mfdataset. 最初我打算尝试类似的东西Xarray.Dataset.groupby,但目前不支持多个项目(即ds.groupby(["latitude", "longitude"])),因此手动制作这些子集并保存路径xarray.save_mfdataset如下。这很慢:
lat_lngs = list(itertools.product(ds.latitude.values, ds.longitude.values))

datasets = []
paths = []

for lat, lng in lat_lngs:
    save_path = os.path.join(out_dir, f"lat_{lat}_lng_{lng}.h5")
    dataset = ds.sel(latitude=lat, longitude=lng)

    datasets.append(dataset)
    paths.append(save_path)

os.makedirs(out_dir, exist_ok=True)

delayed = xr.save_mfdataset(
    datasets,
    paths,
    engine="h5netcdf",
    compute=False,
)

c.persist(delayed, optimize_graph=True)

收集路径和数据集的本机 python for 循环大约需要 40 秒,但在撰写本文时xr.save_mfdataset已经运行了 40分钟,这甚至还没有开始“工作”(我相信这应该在调用时发生之后坚持)。

关于我应该如何优化它的任何提示?或者这似乎是预期/合理的?

标签: pythondaskpython-xarraydask-distributed

解决方案


推荐阅读