首页 > 解决方案 > Spark DataFrame 聚合:windows + 分区与 groupBy 操作

问题描述

我希望对大量数据执行大约五种不同的汇总技术。通常,我希望在某些时间窗口和其他维度上计算平均值、最小值、最大值、标准差和总和。

这是我可以制作的一个可重复的示例:

import random
import string
import datetime

from pyspark.sql import SparkSession, functions as func
from pyspark.conf import SparkConf
from pyspark.sql.types import StringType, DoubleType, IntegerType
from pyspark.sql.window import Window

########## Setting up DataFrame ##########
def random_date(start, n):
    current = start
    for _ in range(n):
        current = current + datetime.timedelta(seconds=random.randrange(60))
        yield current

start_date = datetime.datetime(2013, 9, 20, 13, 00)

n_records = 50000000

dates = list(random_date(start_date, n_records))

other_data = []
for d in dates:
    categorical_data = tuple(random.choice(string.ascii_lowercase) for _ in range(1))
    numerical_data = tuple(random.randrange(100) for _ in range(20))
    other_data.append(categorical_data + numerical_data + (d,))

categorical_columns = ['cat_{}'.format(n) for n in range(1)]
numerical_columns = ['num_{}'.format(n) for n in range(20)]
date_column = ['date']

columns = categorical_columns + numerical_columns + date_column

df = sc.parallelize(other_data).toDF(columns)

df = df.withColumn('date_window', func.window('date', '5 minutes'))

df.registerTempTable('df')

########## End DataFrame setup ##########

迄今为止,我尝试了两种技术:一种使用内置DataFrame.groupBy机制;另一种使用内置机制。另一个使用pyspark.sql.window.Window'sorderBypartitionBy方法。

一般来说,我开发的管道如下所示:

  1. 对于每个数值列,按分类列cat_0和分组date_window,并计算前面列出的五个汇总统计量。
  2. 对于该pyspark.sql.window.Window方法,直接加入计算列,使用df.withColumn. 对于该DataFrame.groupBy方法,跟踪每个结果数据帧(每个数据帧有三列:两列用于分组列,一列用于计算列) - 最后,通过基本上以分组列作为键执行归约操作来连接每个数据帧。

我在下面留下了一些管道代码,但主要对以下观点感兴趣:1)这些中的任何一个是否是此类工作的“最佳实践”,2)如果不是,我是否缺少 Spark 生态系统中的主要内容可以帮助我更快/用更少的资源做到这一点吗?

目前,该groupBy方法的性能稍好一些,但有点麻烦,因为必须跟踪每个分组的 DataFrame 并在最后将它们全部归约。这种Window方法并不是很好,尽管它在语法上更简洁,更易于维护,IMO。在任何一种情况下,我都必须分配大量计算才能让作业运行并在最后写入磁盘(无需重新分区/合并)。

gb_cols = ['cat_0', 'date_window']
strategies = {'sum', 'mean', 'stddev', 'max', 'min'}

Xcols = [col for col in df.columns if col.startswith('num')]

for col in Xcols[:]:
    for s in strategies:
        new_col = '{}_{}'.format(col, s)
        Xcols.append(new_col)

        if s == 'mean':
            calc_col_series = func.mean(col)
        elif s == 'stddev':
            calc_col_series = func.stddev(col)
        elif s == 'max':
            calc_col_series = func.max(col)
        elif s == 'min':
            calc_col_series = func.min(col)
        elif s == 'sum':
            calc_col_series = func.sum(col)
        elif s == 'median':
            query = '''
                SELECT
                    PERCENTILE_APPROX({}, 0.5)
                FROM df
                GROUP BY {}
            '''.format(col, ','.join(gb_cols))
            calc_col_series = spark.sql(query)

        df = df.withColumn(new_col, calc_col_series.over(agg_window))

        # Differencing inputs
        for difference in range(1, 3 + 1):
            # Last period's datapoints... moved to the future
            led_series = func.lag(df[new_col], difference).over(agg_window.orderBy(window_cols['orderBy']))
            diff_series = df[new_col] - led_series

            new_col_diff = '{}_{}_diff'.format(new_col, difference)
            df = df.withColumn(new_col_diff, diff_series)
            Xcols.append(new_col_diff)

标签: pythonapache-sparkdataframe

解决方案


推荐阅读