python - 将分类列转换为附加列
问题描述
我有一个大型数据集,格式为我之前从 avro 文件加载的以下数据框
时间戳 | ID | 类别 | 价值 |
---|---|---|---|
2021-01-01 00:00:00+00:00 | 一个 | d | G |
2021-01-01 00:10:00+00:00 | 一个 | d | H |
2021-01-01 00:10:00+00:00 | 一个 | e | H |
2021-01-01 00:00:00+00:00 | b | e | H |
我想旋转category
列(其中包含大约 50 个不同的类别)并沿timestamp
和id
列进行重复数据删除,因此结果如下所示
ID | 时间戳 | d | e |
---|---|---|---|
一个 | 2021-01-01 00:00:00+00:00 | G | 楠 |
一个 | 2021-01-01 00:10:00+00:00 | H | H |
b | 2021-01-01 00:00:00+00:00 | 楠 | H |
我知道如何将多索引与/操作pandas
一起使用,但是我的数据集太大而无法在没有手动批处理的情况下使用,并且不支持多索引。有什么方法可以有效地完成吗?stack
unstack
pandas
dask
dask
编辑:
正如@Dahn 所指出的,我用熊猫创建了一个最小的合成示例:
import pandas as pd
records = [
{'idx': 0, 'id': 'a', 'category': 'd', 'value': 1},
{'idx': 1, 'id': 'a', 'category': 'e', 'value': 2},
{'idx': 2, 'id': 'a', 'category': 'f', 'value': 3},
{'idx': 0, 'id': 'b', 'category': 'd', 'value': 4},
{'idx': 1, 'id': 'c', 'category': 'e', 'value': 5},
{'idx': 2, 'id': 'c', 'category': 'f', 'value': 6}
]
frame = pd.DataFrame(records)
idx id category value
0 0 a d 1
1 1 a e 2
2 2 a f 3
3 0 b d 4
4 1 c e 5
5 2 c f 6
frame = frame.set_index(['id', 'idx', 'category'], drop=True).unstack().droplevel(0, axis=1).reset_index()
frame.columns.name = ''
id idx d e f
0 a 0 1.0 NaN NaN
1 a 1 NaN 2.0 NaN
2 a 2 NaN NaN 3.0
3 b 0 4.0 NaN NaN
4 c 1 NaN 5.0 NaN
5 c 2 NaN NaN 6.0
解决方案
我不相信 Dask 会在 2021 年 10 月实现这一点。这可能是因为不支持多索引,这unstack
需要。不过,最近在这方面有一些工作。
但是,我认为这仍然应该可以使用apply-concat-apply 范例(和apply_concat_apply
功能)。
下面的解决方案适用于您提供的示例,原则上,我认为它应该可以正常工作,但我不确定。请谨慎行事,并在可能的情况下检查结果是否与 Pandas 提供的结果一致。我也将此作为功能请求发布在 Dask 的 github 上。
import dask.dataframe as dd
# Create Dask DataFrame out of your `frame`
# npartitions is more than 1 to demonstrate this works on a partitioned datataset
df = dd.from_pandas(frame, npartitions=3)
# Dask needs to know work out what the categories are
# Alternatively you can use df.categorize
# See https://docs.dask.org/en/latest/dataframe-categoricals.html
category = 'category'
df[category] = df[category].astype(category).cat.as_known()
# Dask needs to know what the resulting DataFrame looks like
new_columns = pd.CategoricalIndex(df[category].cat.categories, name=category)
meta = pd.DataFrame(columns=new_columns,
index=df._meta.set_index(['idx', 'id']).index)
# Implement using apply_concat_apply ("aca")
# More details: https://blog.dask.org/2019/10/08/df-groupby
def identity(x): return x
def my_unstack(x):
return x.set_index(['id', 'idx', 'category'], drop=True).unstack()
def combine(x):
return x.groupby(level=[0, 1]).sum()
result = dd.core.apply_concat_apply([df],
chunk=identity,
aggregate=my_unstack,
combine=combine,
meta=meta)
result.compute()
选项 B:map_partitions
idx
如果您已经能够根据或中的至少一个对数据进行排序id
,那么您也可以简单地使用map_partitions
每个分区并将其视为 Pandas 数据框。
这应该会导致内存使用和整体性能的显着改善。
# df has sorted index `idx` in this scenario
category = 'category'
existing_categories = df[category].astype(category).cat.as_known().cat.categories
categories = [('value', cat) for cat in existing_categories]
new_columns = pd.MultiIndex.from_tuples(categories, names=(None, category))
meta = pd.DataFrame(columns=new_columns,
index=df._meta.set_index(['idx', 'id']).index)
def unstack_add_columns(x):
x = x.set_index(['id', 'category'], append=True, drop=True).unstack()
# make sure that result contains all necessary columns
return x.reindex(columns=new_columns)
df.map_partitions(unstack_add_columns, meta=meta)
如果你不能保证 idx 会被排序,你可以尝试类似
df_sorted = df.set_index('idx')
# I recommend saving to disk in between set_index and the rest
df_sorted.to_parquet('data-sorted.parq')
但这本身可能会带来内存问题。
推荐阅读
- python - Pyglet 将纹理区域 blit 到纹理上
- java - 停止在 Java 中以编程方式执行 Spring JDBC 查询
- excel - 将 Excel 文件另存为 CSV,而不截断文本格式的值
- sqlalchemy - 工厂男孩:在模型上没有外键时创建关系
- python - 在 python 请求中使用代理
- .net - 使用具有强密码的 SSH.NET
- java - 将改造响应从一个类发送到一个活动
- python - 排序所需的多标签时间序列
- python - 我可以在没有 for 循环的情况下将函数应用于 Pandas 数据框中的多个列吗?
- python - 如何在 Python 中使用 NuGet 包?