首页 > 解决方案 > 将 pandas 数据帧缩放并连接成一个 dask 数据帧

问题描述

我有一个相当大的熊猫数据框df。我也有熊猫系列的比例因子factors

我想df针对每个比例因子进行缩放,factors并将这些数据帧连接在一起形成一个更大的数据帧。由于这个大数据帧不适合内存,我认为使用 dask 数据帧可能会很好。但我不知道如何解决这个问题。

以下是我想要实现的,但使用熊猫数据框。在dflarge实际情况下将不适合内存。

import random
import pandas as pd

df = pd.DataFrame({
        'id1': range(1,6), 
        'a': [random.random() for i in range(5)], 
        'b': [random.random() for i in range(5)],
    })
df = df.set_index('id1')

factors = [random.random() for i in range(10)]

dflist = []

for i, factor in enumerate(factors):
    scaled = df*factor
    scaled['id2'] = i
    dflist.append(scaled)

dflarge = pd.concat(dflist)
dflarge = dflarge.reset_index().set_index(['id1', 'id2'])

我想让缩放和连接尽可能高效,因为会有数以万计的比例因子。如果可能的话,我想运行它。

我非常感谢您提供的任何帮助。

标签: pythonpandasdataframedaskdask-distributed

解决方案


只是延迟它!

Dask.dataframe并且dask.delayed是您在这里需要的,并且使用它运行它dask.distributed应该可以正常工作。假设它df仍然是一个pandas.DataFrame,将循环转换为一个函数,您可以使用它在列表推导中调用dask.delayed。我在下面对您的代码进行了一些小改动:

import random
import pandas as pd
import dask.dataframe as dd
from dask import delayed

df = pd.DataFrame({
        'id1': range(1,6), 
        'a': [random.random() for i in range(5)], 
        'b': [random.random() for i in range(5)],
    })
df = df.set_index('id1')

factors = [random.random() for i in range(10)]

dflist = []

def scale_my_df(df_init, scale_factor, id_num):
    '''
    Scales and returns a DataFrame.
    '''
    df_scaled = df_init * scale_factor
    df_scaled['id2'] = id_num
    return df_scaled

dfs_delayed = [delayed(scale_my_df)(df_init=df, scale_factor=factor, id_num=i) 
               for i, factor in enumerate(factors)]
ddf = dd.from_delayed(dfs_delayed)

现在你已经dask.DataFrame从你的 scaled pandas.DataFrames 构建了一个。有两点需要注意:

  1. Dask是懒惰的,因此在此代码片段的末尾没有计算任何内容。计算图已设置为创建所需的 DataFrame 所需的操作。在这个带有小 DataFrame 的示例中,您可以执行:

    ddf_large = ddf.compute()

假设它们相同,您将与上面的pandas.DataFrame代码相同。几乎...dflargefactors

  1. 在撰写本文时dask,似乎不支持多级索引,因此您的.set_index(['id1', 'id2'])代码将无法工作。这已在问题 #1493中提出,如果您确实需要多级索引,有一些解决方法。

编辑:

  1. 如果原始数据df非常大,例如已经最大化内存,则将其转换为.csv或其他pandas可读格式,并将其构建到 scale 函数中可能是必要的,即:
    def scale_my_df(df_filepath, scale_factor, id_num):
        '''
        Scales and returns a DataFrame.
        '''
        df_init = pd.read_csv(df_filepath)
        df_scaled = df_init * scale_factor
        df_scaled['id2'] = id_num
        return df_scaled

并相应地调整其余代码。的想法dask是将数据保留在内存之外,但是构建计算图和保存中间值会产生一些开销。


推荐阅读