python - 如何(确定地)缓存以分块方式返回 Dask 数据帧的函数?
问题描述
我想以分块的方式确定性地将返回Dask dataframes的任意函数缓存到镶木地板文件中。我说“确定性”是为了将其与 Dask 的机会缓存机制区分开来,这很好,但不是我想要的。
到目前为止,我有这个装饰器,它实现了我想要的缓存机制,但是以非分块的方式:
import base64
from functools import wraps
import hashlib
import json
from pathlib import Path
import dask.dataframe as dd
def ezhash(o):
o = json.dumps(o, sort_keys=True)
o = hashlib.md5(o.encode('utf-8')).digest()
o = base64.urlsafe_b64encode(o).decode('ascii')
return o
def cached_compute(enabled=True, engine='pyarrow', write_kwargs={}, read_kwargs={}):
def _cached_compute(f):
"""
Either compute and cache function f, or get its cached value
operates out-of-core
adds a named parameter input_state to f
:param f: The function to cache
f must return a (lazy) dask container containing parquet-serializable dtypes
if any column is not parquet-serializable (in particular, array objects), you get errors like this:
ValueError: Error converting column "page_ssb" to bytes using encoding UTF8.
Original error: bad argument type for built-in operation
:param input_state: Optional input state to cache, instead of caching f's arguments
if input_state is not given: compute key from f's name and arguments
if input_state is given: compute key from f's name and input_state
"""
@wraps(f)
def g(*args, input_state=None, **kwargs):
if not enabled:
return f(*args, **kwargs)
if input_state == None:
key = ezhash({'name': f.__name__, 'args': args, 'kwargs': kwargs})
else:
key = ezhash({'name': f.__name__, 'input_state': input_state})
key_path = Path(f'parquet_cache/{key}/')
cache_keys = [path.name for path in Path('parquet_cache').glob('*')]
if key not in cache_keys:
print(f'caching to {key}')
res = f(*args, **kwargs)
res.to_parquet(key_path, engine=engine, **write_kwargs)
print(f'reading cache {key}')
res = dd.read_parquet(key_path, engine=engine, **read_kwargs)
return key, res
return g
return _cached_compute
这可以很好地缓存返回 Dask 数据帧的函数的整体结果。但是,现在我正在处理一个执行网络调用的函数,这意味着它可能会间歇性地失败。我现在希望上述缓存机制按行或按块缓存结果,这样重新运行该函数将加载缓存的行/块,但在失败的行/块上重试该函数(并随后缓存结果) .
有人对如何做到这一点有任何想法吗?
2021-10-05 更新:添加预期的输入和输出
将以下测试代码附加到上面:
import pandas as pd
import random
def network_call(row):
"""Simulate a network call"""
if random.random() > 0.9:
# success 90%
response = sum(row.tolist())
else:
# fail 10%
response = None
row['response'] = response
return row
def run_example(cache_enabled, max_runs=5):
def expensive_function(input_data):
ddf = dd.from_pandas(pd.DataFrame(input_data), npartitions=4)
ddf = ddf.apply(network_call, axis=1, meta={k: 'int64' for k in [*input_data.keys(), 'response']})
return ddf
if cache_enabled:
# apply caching decorator
expensive_function = cached_compute(write_kwargs={'schema': 'infer'})(expensive_function)
input_data = {
'col_1': list(range( 0, 100)),
'col_2': list(range(100, 200)),
'col_3': list(range(200, 300)),
}
prev_success = 0
for i_run in range(max_runs):
if cache_enabled:
_, ddf = expensive_function(input_data)
else:
ddf = expensive_function(input_data)
n_success = sum(ddf['response'].isna() > 0)
print(f'''Run #{i_run + 1:>2}: Total successes {n_success:>3}/100 (+{n_success - prev_success:>2})''')
prev_success = n_success
print('''Running with cache disabled''')
run_example(cache_enabled=False)
print('''\nRunning with cache enabled''')
run_example(cache_enabled=True)
运行:
$ python test_.py
Running with cache disabled
Run # 1: Total successes 92/100 (+92)
Run # 2: Total successes 92/100 (+ 0)
Run # 3: Total successes 97/100 (+ 5)
Run # 4: Total successes 92/100 (+-5)
Run # 5: Total successes 90/100 (+-2)
Running with cache enabled
caching to 9Egrh7PYPHO2yoGMnNCBpg==
reading cache 9Egrh7PYPHO2yoGMnNCBpg==
Run # 1: Total successes 89/100 (+89)
reading cache 9Egrh7PYPHO2yoGMnNCBpg==
Run # 2: Total successes 89/100 (+ 0)
reading cache 9Egrh7PYPHO2yoGMnNCBpg==
Run # 3: Total successes 89/100 (+ 0)
reading cache 9Egrh7PYPHO2yoGMnNCBpg==
Run # 4: Total successes 89/100 (+ 0)
reading cache 9Egrh7PYPHO2yoGMnNCBpg==
Run # 5: Total successes 89/100 (+ 0)
观察缓存机制当前如何加载第一次运行的结果而不管成功,以及它如何不重新运行失败的计算。
对这个问题的成功回答应该是这样的:
caching to 9Egrh7PYPHO2yoGMnNCBpg==
reading cache 9Egrh7PYPHO2yoGMnNCBpg==
Run # 1: Total successes 89/100 (+89)
reading cache 9Egrh7PYPHO2yoGMnNCBpg==
Run # 2: Total successes 98/100 (+ 9)
reading cache 9Egrh7PYPHO2yoGMnNCBpg==
Run # 3: Total successes 100/100 (+ 2)
reading cache 9Egrh7PYPHO2yoGMnNCBpg==
Run # 4: Total successes 100/100 (+ 0)
reading cache 9Egrh7PYPHO2yoGMnNCBpg==
Run # 5: Total successes 100/100 (+ 0)
解决方案
推荐阅读
- python - 为什么在启动 Maya 后第一次运行时代码会执行两次?
- python - 在字典中返回特定列表的最快方法是什么?
- php - 如何仅在 MYSQLI 中显示重复的结果?
- python - contrib 在 Django 中代表什么?为什么?
- vba - 如何在 ms-word 的功能区中重命名 VBA 模板名称?
- python - 如何在 .srt 文件中进行文本匹配并获取文本所在行的时间戳
- windows - 无需用户登录即可运行 Docker for Windows
- android - 如何使用 Json 从服务器获取数据并将数据上传到服务器
- amazon-web-services - 如何在 Powershell 中将 AWS CLI 参数作为变量传递
- angular - 使用 Angular 6 限制/禁用浏览器快捷方式