首页 > 解决方案 > 如何(确定地)缓存以分块方式返回 Dask 数据帧的函数?

问题描述

我想以分块的方式确定性地将返回Dask dataframes的任意函数缓存到镶木地板文件中。我说“确定性”是为了将其与 Dask 的机会缓存机制区分开来,这很好,但不是我想要的。

到目前为止,我有这个装饰器,它实现了我想要的缓存机制,但是以非分块的方式:

import base64
from functools import wraps
import hashlib
import json
from pathlib import Path

import dask.dataframe as dd

def ezhash(o):
    o = json.dumps(o, sort_keys=True)
    o = hashlib.md5(o.encode('utf-8')).digest()
    o = base64.urlsafe_b64encode(o).decode('ascii')
    return o

def cached_compute(enabled=True, engine='pyarrow', write_kwargs={}, read_kwargs={}):
    def _cached_compute(f):
        """
        Either compute and cache function f, or get its cached value
        operates out-of-core
        adds a named parameter input_state to f
        :param f: The function to cache
            f must return a (lazy) dask container containing parquet-serializable dtypes
            if any column is not parquet-serializable (in particular, array objects), you get errors like this:
                ValueError: Error converting column "page_ssb" to bytes using encoding UTF8.
                Original error: bad argument type for built-in operation
        :param input_state: Optional input state to cache, instead of caching f's arguments
            if input_state is not given: compute key from f's name and arguments
            if input_state is given: compute key from f's name and input_state
        """
        @wraps(f)
        def g(*args, input_state=None, **kwargs):
            if not enabled:
                return f(*args, **kwargs)
            if input_state == None:
                key = ezhash({'name': f.__name__, 'args': args, 'kwargs': kwargs})
            else:
                key = ezhash({'name': f.__name__, 'input_state': input_state})
            key_path = Path(f'parquet_cache/{key}/')
            cache_keys = [path.name for path in Path('parquet_cache').glob('*')]
            if key not in cache_keys:
                print(f'caching to {key}')
                res = f(*args, **kwargs)
                res.to_parquet(key_path, engine=engine, **write_kwargs)
            print(f'reading cache {key}')
            res = dd.read_parquet(key_path, engine=engine, **read_kwargs)
            return key, res
        return g
    return _cached_compute

这可以很好地缓存返回 Dask 数据帧的函数的整体结果。但是,现在我正在处理一个执行网络调用的函数,这意味着它可能会间歇性地失败。我现在希望上述缓存机制按行或按块缓存结果,这样重新运行该函数将加载缓存的行/块,但在失败的行/块上重试该函数(并随后缓存结果) .

有人对如何做到这一点有任何想法吗?

2021-10-05 更新:添加预期的输入和输出

将以下测试代码附加到上面:

import pandas as pd
import random

def network_call(row):
    """Simulate a network call"""
    if random.random() > 0.9:
        # success 90%
        response = sum(row.tolist())
    else:
        # fail 10%
        response = None
    row['response'] = response
    return row

def run_example(cache_enabled, max_runs=5):
    def expensive_function(input_data):
        ddf = dd.from_pandas(pd.DataFrame(input_data), npartitions=4)
        ddf = ddf.apply(network_call, axis=1, meta={k: 'int64' for k in [*input_data.keys(), 'response']})
        return ddf

    if cache_enabled:
        # apply caching decorator
        expensive_function = cached_compute(write_kwargs={'schema': 'infer'})(expensive_function)

    input_data = {
        'col_1': list(range(  0, 100)),
        'col_2': list(range(100, 200)),
        'col_3': list(range(200, 300)),
    }

    prev_success = 0
    for i_run in range(max_runs):
        if cache_enabled:
            _, ddf = expensive_function(input_data)
        else:
            ddf = expensive_function(input_data)
        n_success = sum(ddf['response'].isna() > 0)
        print(f'''Run #{i_run + 1:>2}: Total successes {n_success:>3}/100 (+{n_success - prev_success:>2})''')
        prev_success = n_success

print('''Running with cache disabled''')
run_example(cache_enabled=False)

print('''\nRunning with cache enabled''')
run_example(cache_enabled=True)

运行:

$ python test_.py 
Running with cache disabled
Run # 1: Total successes  92/100 (+92)
Run # 2: Total successes  92/100 (+ 0)
Run # 3: Total successes  97/100 (+ 5)
Run # 4: Total successes  92/100 (+-5)
Run # 5: Total successes  90/100 (+-2)

Running with cache enabled
caching to 9Egrh7PYPHO2yoGMnNCBpg==
reading cache 9Egrh7PYPHO2yoGMnNCBpg==
Run # 1: Total successes  89/100 (+89)
reading cache 9Egrh7PYPHO2yoGMnNCBpg==
Run # 2: Total successes  89/100 (+ 0)
reading cache 9Egrh7PYPHO2yoGMnNCBpg==
Run # 3: Total successes  89/100 (+ 0)
reading cache 9Egrh7PYPHO2yoGMnNCBpg==
Run # 4: Total successes  89/100 (+ 0)
reading cache 9Egrh7PYPHO2yoGMnNCBpg==
Run # 5: Total successes  89/100 (+ 0)

观察缓存机制当前如何加载第一次运行的结果而不管成功,以及它如何不重新运行失败的计算。

对这个问题的成功回答应该是这样的:

caching to 9Egrh7PYPHO2yoGMnNCBpg==
reading cache 9Egrh7PYPHO2yoGMnNCBpg==
Run # 1: Total successes  89/100 (+89)
reading cache 9Egrh7PYPHO2yoGMnNCBpg==
Run # 2: Total successes  98/100 (+ 9)
reading cache 9Egrh7PYPHO2yoGMnNCBpg==
Run # 3: Total successes 100/100 (+ 2)
reading cache 9Egrh7PYPHO2yoGMnNCBpg==
Run # 4: Total successes 100/100 (+ 0)
reading cache 9Egrh7PYPHO2yoGMnNCBpg==
Run # 5: Total successes 100/100 (+ 0)

标签: pythonpandascachingdaskdask-dataframe

解决方案


推荐阅读