首页 > 解决方案 > 是否可以根据时间边界而不是记录数在 Apache Arrow 中定义记录批次?

问题描述

在阅读Apache Arrow时,我遇到了 Record Batches 的概念,顾名思义,它对多个记录进行批处理以启用流处理。即:每批处理记录,而不必接收整个流。

我看到的示例为每 X 条记录创建一个新的记录批次。是否也可以按其他标准创建记录批次?具体来说,我想将同一小时的记录分组到同一个记录批次中。换句话说:允许通过一些可配置的时间边界创建记录批次。

这可能吗?

标签: pyarrowapache-arrow

解决方案


记录批次是一组列,其中每列具有相同的长度。您可以使用您想要的任何标准将表(或记录批次)分成更小的批次。

我不确定您是在建立批次还是将现有的表/批次分解成更小的批次。

目前,您将不得不自己进行分组。这是一个使用 Pandas 获取具有随机日期的数据框并将其转换为每个月都有自己的记录批次的表的示例。

import numpy as np
import pandas as pd
import pyarrow as pa

def random_dates(start, end, n):

    start_u = start.value//10**9
    end_u = end.value//10**9

    return pd.to_datetime(np.random.randint(start_u, end_u, n), unit='s')

start = pd.to_datetime('2015-01-01')
end = pd.to_datetime('2018-01-01')
dates = random_dates(start, end, 10000)

df = pd.DataFrame({'dates': dates})
sub_dfs = [sub_df for _, sub_df in df.groupby(pd.Grouper(key='dates', freq='M'))]
tables = [pa.Table.from_pandas(sub_df) for sub_df in sub_dfs]
batches = [batch for table in tables for batch in table.to_batches()]
table = pa.Table.from_batches(batches)

作为https://issues.apache.org/jira/browse/ARROW-11591的一部分,pyarrow 内部的group_by 计算函数正在做一些工作。完成后,您将能够使用 pyarrow 表达式而不是 pandas 来对表进行分区。


推荐阅读