pyarrow - 在 pyarrow 表中删除重复项?
问题描述
有没有办法使用纯 pyarrow 表对数据进行排序并删除重复项?我的目标是根据最大更新时间戳检索每个 ID 的最新版本。
一些额外的细节:我的数据集通常至少分为两个版本:
- 历史的
- 最后
历史数据集将包括来自某个源的所有更新项目,因此对于发生在其上的每个更改,单个 ID 可能具有重复项(例如,想象 Zendesk 或 ServiceNow 票证,其中票证可以多次更新)
然后我使用过滤器读取历史数据集,将其转换为 pandas DF,对数据进行排序,然后在一些唯一约束列上删除重复项。
dataset = ds.dataset(history, filesystem, partitioning)
table = dataset.to_table(filter=filter_expression, columns=columns)
df = table.to_pandas().sort_values(sort_columns, ascending=True).drop_duplicates(unique_constraint, keep="last")
table = pa.Table.from_pandas(df=df, schema=table.schema, preserve_index=False)
# ds.write_dataset(final, filesystem, partitioning)
# I tend to write the final dataset using the legacy dataset so I can make use of the partition_filename_cb - that way I can have one file per date_id. Our visualization tool connects to these files directly
# container/dataset/date_id=20210127/20210127.parquet
pq.write_to_dataset(final, filesystem, partition_cols=["date_id"], use_legacy_dataset=True, partition_filename_cb=lambda x: str(x[-1]).split(".")[0] + ".parquet")
如果可能的话,最好将转换为熊猫然后返回到表格中。
解决方案
2022 年 3 月编辑:PyArrow 正在添加更多功能,尽管这个功能还没有。我现在的方法是:
def drop_duplicates(table: pa.Table, column_name: str) -> pa.Table:
unique_values = pc.unique(table[column_name])
unique_indices = [pc.index(table[column_name], value).as_py() for value in unique_values]
mask = np.full((len(table)), False)
mask[unique_indices] = True
return table.filter(mask=mask)
//结束编辑
我看到你的问题是因为我有一个类似的问题,我为我的工作解决了这个问题(由于 IP 问题,我无法发布整个代码,但我会尽量回答。我从来没有这样做过前)
import pyarrow.compute as pc
import pyarrow as pa
import numpy as np
array = table.column(column_name)
dicts = {dct['values']: dct['counts'] for dct in pc.value_counts(array).to_pylist()}
for key, value in dicts.items():
# do stuff
我使用“value_counts”来查找唯一值以及它们有多少(https://arrow.apache.org/docs/python/generated/pyarrow.compute.value_counts.html)。然后我迭代了这些值。如果值为 1,我使用
mask = pa.array(np.array(array) == key)
row = table.filter(mask)
如果计数大于 1,我再次使用 numpy 布尔数组作为掩码选择第一个或最后一个。
迭代后它就像 pa.concat_tables(tables) 一样简单
警告:这是一个缓慢的过程。如果您需要快速和肮脏的东西,请尝试“唯一”选项(也在我提供的同一链接中)。
编辑/额外::您可以通过在迭代字典时保持一个布尔掩码的numpy数组来使其更快/更少内存密集型。然后最后你返回一个“table.filter(mask=boolean_mask)”。我不知道如何计算速度...
edit2:( 很抱歉进行了许多编辑。我一直在进行大量重构并试图让它更快地工作。)
您也可以尝试以下方法:
def drop_duplicates(table: pa.Table, col_name: str) ->pa.Table:
column_array = table.column(col_name)
mask_x = np.full((table.shape[0]), False)
_, mask_indices = np.unique(np.array(column_array), return_index=True)
mask_x[mask_indices] = True
return table.filter(mask=mask_x)
推荐阅读
- r - ggplot:将线条轮廓添加到散点图
- c# - 由于某种原因,Html.TextBoxFor 上的必填字段被忽略
- amazon-web-services - 在 SAM 项目中部署 lambda 时如何忽略冗余文件
- phpstorm - PhpStorm 添加 $PROJECT_DIR$ 作为包含路径
- r - R devtools包安装找不到openssl
- ios - xcode:xcode 11.2 中的 iOS 10 及更低版本的模拟器
- java - 如何在不创建 JSOUP 文档的情况下处理图像标签
- javascript - $("#tableID").load(location.href + "#tableID") 正在创建新表而不是刷新现有表
- c# - 在 LINQ c# 中将 OR 转换为 And 条件数组值
- logstash - 简单系统日志的 Grok 模式