首页 > 解决方案 > 在 pyarrow 表中删除重复项?

问题描述

有没有办法使用纯 pyarrow 表对数据进行排序并删除重复项?我的目标是根据最大更新时间戳检索每个 ID 的最新版本。

一些额外的细节:我的数据集通常至少分为两个版本:

历史数据集将包括来自某个源的所有更新项目,因此对于发生在其上的每个更改,单个 ID 可能具有重复项(例如,想象 Zendesk 或 ServiceNow 票证,其中票证可以多次更新)

然后我使用过滤器读取历史数据集,将其转换为 pandas DF,对数据进行排序,然后在一些唯一约束列上删除重复项。

dataset = ds.dataset(history, filesystem, partitioning)
table = dataset.to_table(filter=filter_expression, columns=columns)
df = table.to_pandas().sort_values(sort_columns, ascending=True).drop_duplicates(unique_constraint, keep="last")
table = pa.Table.from_pandas(df=df, schema=table.schema, preserve_index=False)

# ds.write_dataset(final, filesystem, partitioning)

# I tend to write the final dataset using the legacy dataset so I can make use of the partition_filename_cb - that way I can have one file per date_id. Our visualization tool connects to these files directly
# container/dataset/date_id=20210127/20210127.parquet

pq.write_to_dataset(final, filesystem, partition_cols=["date_id"], use_legacy_dataset=True, partition_filename_cb=lambda x: str(x[-1]).split(".")[0] + ".parquet")

如果可能的话,最好将转换为熊猫然后返回到表格中。

标签: pyarrow

解决方案


2022 年 3 月编辑:PyArrow 正在添加更多功能,尽管这个功能还没有。我现在的方法是:

def drop_duplicates(table: pa.Table, column_name: str) -> pa.Table:
    unique_values = pc.unique(table[column_name])
    unique_indices = [pc.index(table[column_name], value).as_py() for value in unique_values]
    mask = np.full((len(table)), False)
    mask[unique_indices] = True
    return table.filter(mask=mask)

//结束编辑

我看到你的问题是因为我有一个类似的问题,我为我的工作解决了这个问题(由于 IP 问题,我无法发布整个代码,但我会尽量回答。我从来没有这样做过前)

import pyarrow.compute as pc
import pyarrow as pa
import numpy as np

array = table.column(column_name)
dicts = {dct['values']: dct['counts'] for dct in pc.value_counts(array).to_pylist()}
for key, value in dicts.items():
    # do stuff

我使用“value_counts”来查找唯一值以及它们有多少(https://arrow.apache.org/docs/python/generated/pyarrow.compute.value_counts.html)。然后我迭代了这些值。如果值为 1,我使用

mask = pa.array(np.array(array) == key)
row = table.filter(mask)

如果计数大于 1,我再次使用 numpy 布尔数组作为掩码选择第一个或最后一个。

迭代后它就像 pa.concat_tables(tables) 一样简单

警告:这是一个缓慢的过程。如果您需要快速和肮脏的东西,请尝试“唯一”选项(也在我提供的同一链接中)。

编辑/额外::您可以通过在迭代字典时保持一个布尔掩码的numpy数组来使其更快/更少内存密集型。然后最后你返回一个“table.filter(mask=boolean_mask)”。我不知道如何计算速度...

edit2:( 很抱歉进行了许多编辑。我一直在进行大量重构并试图让它更快地工作。)

您也可以尝试以下方法:

def drop_duplicates(table: pa.Table, col_name: str) ->pa.Table:
    column_array = table.column(col_name)
    mask_x = np.full((table.shape[0]), False)
    _, mask_indices = np.unique(np.array(column_array), return_index=True)
    mask_x[mask_indices] = True
    return table.filter(mask=mask_x)

推荐阅读