python - 将 netcdfs 转换为 zarr 并重新分块时 CPU 利用率低
问题描述
我正在将数据从 netcdf 传输和重新分块到 zarr。该过程很慢,并且没有使用太多的 CPU。我尝试了几种不同的配置,有时它似乎做得稍微好一点,但效果不佳。有没有人有任何提示可以更有效地运行?
最后一次尝试(以及之前的一些尝试,可能是所有尝试)(使用单机、分布式调度程序和使用线程)日志给出了以下消息:
Distributed.core - 信息 - 事件循环在 Worker 中无响应 10.05 秒。这通常是由长时间运行的 GIL 持有函数或移动大块数据引起的。
以前我遇到过内存用完的错误,所以我正在使用下面的“stepwise_to_zarr”函数分段编写 zarr:
def stepwise_to_zarr(dataset, step_dim, step_size, chunks, out_loc, group):
start = dataset[step_dim].min()
end = dataset[step_dim].max()
iis = np.arange(start, end, step_size)
if end > iis[-1]:
iis = np.append(iis, end)
lon=dataset.get_index(step_dim)
first = True
failures = []
for i in range(1,len(iis)):
lower, upper = (iis[i-1], iis[i])
if upper >= end:
lon_list= [l for l in lon if lower <= l <= upper]
else:
lon_list= [l for l in lon if lower <= l < upper]
sub = dataset.sel(longitude=lon_list)
rechunked_sub = sub.chunk(chunks)
write_sync=zarr.ThreadSynchronizer()
if first:
rechunked_sub.to_zarr(out_loc, group=group,
consolidated=True, synchronizer=write_sync, mode="w")
first = False
else:
rechunked_sub.to_zarr(out_loc, group=group,
consolidated=True, synchronizer=write_sync, append_dim=step_dim)
chunks = {'time':8760, 'latitude':21, 'longitude':20}
ds = xr.open_mfdataset("path to data", parallel=True, combine="by_coords")
stepwise_to_zarr(ds, step_size=10, step_dim="longitude",
chunks=chunks, out_loc="path to output", group="group name")
在上图中,利用率从约 6% 下降到约 0.5% 似乎与完成 10 度纬度的第一批“批次”相吻合。
背景资料:
- 我正在使用 32 个 vCPU 和 256 GB 内存的单个 GCE 实例。
- 数据大约 600 GB,分布在大约 150 个 netcdf 文件中。
- 数据在 GCS 中,我正在使用 Cloud Storage FUSE 读取和写入数据。
- 我将数据从块大小重新分块:{'time':1,'latitude':521,'longitude':1440} 到块大小:{'time':8760,'latitude':21,'longitude':20}
我努力了:
- 使用默认的多处理调度程序
- 对单机(https://docs.dask.org/en/latest/setup/single-distributed.html)使用分布式调度程序,processs=True 和 processes=False。
- 分布式调度程序和默认的多处理调度程序,同时还设置环境变量以避免过度订阅线程,如下所示:
export OMP_NUM_THREADS=1
export MKL_NUM_THREADS=1
export OPENBLAS_NUM_THREADS=1
解决方案
我最终通过写入带有块的中间 Zarr 存储解决了我的问题:{'time':8760, 'latitude':260, 'longitude':360}。这进展很快,尽管 cpu 资源仅在相对较小的部分工作中得到充分利用。然后,我使用问题中描述的逐步过程的修改版本读取了这个中间 zarr 并存储在最终的分块中。这给出了可接受的性能,尽管并不理想。
这是代码:
def stepwise_to_zarr(dataset, step_dim, step_size, encoding, out_loc, group, include_end=True):
start = dataset[step_dim].min()
end = dataset[step_dim].max()
iis = np.arange(start, end, step_size)
if end > iis[-1]:
iis = np.append(iis, end)
lon=dataset.get_index(step_dim)
first = True
failures = []
for i in range(1,len(iis)):
lower, upper = (iis[i-1], iis[i])
if upper >= end and include_end:
lon_list= [l for l in lon if lower <= l <= upper]
else:
lon_list= [l for l in lon if lower <= l < upper]
sub = dataset.sel(longitude=lon_list)
write_sync=zarr.ThreadSynchronizer()
if first:
sub_write=sub.to_zarr(output_loc,
group=varname,
consolidated=True,
synchronizer=write_sync,
encoding=encoding,
mode="w", compute=False)
first = False
else:
sub_write=sub.to_zarr(output_loc,
group=varname,
consolidated=True,
synchronizer=write_sync,
append_dim=step_dim,
compute=False)
sub_write.compute(retries=2)
z = xr.open_zarr(input_loc, group=groupname)
new_chunks = {'time':8760, 'latitude':21, 'longitude':20}
z_rechunked=z.chunk(new_chunks)
#Workaround to avoid:NotImplementedError: Specified zarr chunks (8760, 260, 360) would #overlap multiple dask chunks
#See https://github.com/pydata/xarray/issues/2300
encoding = {}
for v in ['var1', 'var2', 'var3']:
encoding.update({v:z[v].encoding.copy()})
encoding[v]["chunks"]=(96408, 21, 20)
stepwise_to_zarr(z_rechunked, "longitude", 60, encoding, output_loc, group=groupname)
注意我必须覆盖编码才能重新分块 zarrs。
这个过程有效,但有点麻烦。我之所以这样做,是因为我没有听说过 rechunker。下次我重新分块时,我会尝试重新分块来解决这个问题。
推荐阅读
- php - 如何使用 MySQL 中的嵌套查询选择和插入直到订购数量完成
- javascript - Express Body Parser - 插入显示未定义的嵌套数组对象
- python - 如果它们在特定时间范围内,则按条件替换行值
- data-annotations - 从 LabelBox 工具导出分段标签
- c# - c# - 使用 httpwebrequest 发送文本
- html - 冻结html表格中的两行标题
- android-studio - 从图库中选择的图像无法在 android studio 中转换为位图
- python - 如何使用 plotly 仅显示列表中的日期
- sql - 通过 ODBC 驱动连接到 ELK Rollup Index
- javascript - 如何通过 div 单击按钮?(指针事件不起作用)