首页 > 解决方案 > 将分类列转换为附加列

问题描述

我有一个大型数据集,格式为我之前从 avro 文件加载的以下数据框

时间戳 ID 类别 价值
2021-01-01 00:00:00+00:00 一个 d G
2021-01-01 00:10:00+00:00 一个 d H
2021-01-01 00:10:00+00:00 一个 e H
2021-01-01 00:00:00+00:00 b e H

我想旋转category列(其中包含大约 50 个不同的类别)并沿timestampid列进行重复数据删除,因此结果如下所示

ID 时间戳 d e
一个 2021-01-01 00:00:00+00:00 G
一个 2021-01-01 00:10:00+00:00 H H
b 2021-01-01 00:00:00+00:00 H

我知道如何将多索引与/操作pandas一起使用,但是我的数据集太大而无法在没有手动批处理的情况下使用,并且不支持多索引。有什么方法可以有效地完成吗?stackunstackpandasdaskdask

编辑:

正如@Dahn 所指出的,我用熊猫创建了一个最小的合成示例:


import pandas as pd

records = [
    {'idx': 0, 'id': 'a', 'category': 'd', 'value': 1},
    {'idx': 1, 'id': 'a', 'category': 'e', 'value': 2},
    {'idx': 2, 'id': 'a', 'category': 'f', 'value': 3},
    {'idx': 0, 'id': 'b', 'category': 'd', 'value': 4},
    {'idx': 1, 'id': 'c', 'category': 'e', 'value': 5},
    {'idx': 2, 'id': 'c', 'category': 'f', 'value': 6}
]

frame = pd.DataFrame(records)
   idx id category  value
0    0  a        d      1
1    1  a        e      2
2    2  a        f      3
3    0  b        d      4
4    1  c        e      5
5    2  c        f      6
frame = frame.set_index(['id', 'idx', 'category'], drop=True).unstack().droplevel(0, axis=1).reset_index()
frame.columns.name = ''
  id  idx    d    e    f
0  a    0  1.0  NaN  NaN
1  a    1  NaN  2.0  NaN
2  a    2  NaN  NaN  3.0
3  b    0  4.0  NaN  NaN
4  c    1  NaN  5.0  NaN
5  c    2  NaN  NaN  6.0


标签: pythondaskdask-dataframe

解决方案


我不相信 Dask 会在 2021 年 10 月实现这一点。这可能是因为不支持多索引,这unstack需要。不过,最近在这方面有一些工作。

但是,我认为这仍然应该可以使用apply-concat-apply 范例(和apply_concat_apply功能)。

下面的解决方案适用于您提供的示例,原则上,我认为它应该可以正常工作,但我不确定。请谨慎行事,并在可能的情况下检查结果是否与 Pandas 提供的结果一致。我也将此作为功能请求发布在 Dask 的 github 上。

import dask.dataframe as dd

# Create Dask DataFrame out of your `frame`
# npartitions is more than 1 to demonstrate this works on a partitioned datataset
df = dd.from_pandas(frame, npartitions=3)

# Dask needs to know work out what the categories are
# Alternatively you can use df.categorize
# See https://docs.dask.org/en/latest/dataframe-categoricals.html
category = 'category'
df[category] = df[category].astype(category).cat.as_known()

# Dask needs to know what the resulting DataFrame looks like
new_columns = pd.CategoricalIndex(df[category].cat.categories, name=category)
meta = pd.DataFrame(columns=new_columns, 
                    index=df._meta.set_index(['idx', 'id']).index)

# Implement using apply_concat_apply ("aca")
# More details: https://blog.dask.org/2019/10/08/df-groupby
def identity(x): return x

def my_unstack(x):
    return x.set_index(['id', 'idx', 'category'], drop=True).unstack()
    
def combine(x):
    return x.groupby(level=[0, 1]).sum()

result = dd.core.apply_concat_apply([df], 
                   chunk=identity, 
                   aggregate=my_unstack, 
                   combine=combine,
                   meta=meta)

result.compute()

选项 B:map_partitions

idx如果您已经能够根据或中的至少一个对数据进行排序id,那么您也可以简单地使用map_partitions每个分区并将其视为 Pandas 数据框。

这应该会导致内存使用和整体性能的显着改善。

# df has sorted index `idx` in this scenario

category = 'category'
existing_categories = df[category].astype(category).cat.as_known().cat.categories
categories = [('value', cat) for cat in existing_categories]

new_columns = pd.MultiIndex.from_tuples(categories, names=(None, category))

meta = pd.DataFrame(columns=new_columns, 
                    index=df._meta.set_index(['idx', 'id']).index)

def unstack_add_columns(x):
    x = x.set_index(['id', 'category'], append=True, drop=True).unstack()
    # make sure that result contains all necessary columns
    return x.reindex(columns=new_columns) 

df.map_partitions(unstack_add_columns, meta=meta)

如果你不能保证 idx 会被排序,你可以尝试类似

df_sorted = df.set_index('idx')
# I recommend saving to disk in between set_index and the rest
df_sorted.to_parquet('data-sorted.parq')

但这本身可能会带来内存问题。


推荐阅读