python - Spark DataFrame 聚合:windows + 分区与 groupBy 操作
问题描述
我希望对大量数据执行大约五种不同的汇总技术。通常,我希望在某些时间窗口和其他维度上计算平均值、最小值、最大值、标准差和总和。
这是我可以制作的一个可重复的示例:
import random
import string
import datetime
from pyspark.sql import SparkSession, functions as func
from pyspark.conf import SparkConf
from pyspark.sql.types import StringType, DoubleType, IntegerType
from pyspark.sql.window import Window
########## Setting up DataFrame ##########
def random_date(start, n):
current = start
for _ in range(n):
current = current + datetime.timedelta(seconds=random.randrange(60))
yield current
start_date = datetime.datetime(2013, 9, 20, 13, 00)
n_records = 50000000
dates = list(random_date(start_date, n_records))
other_data = []
for d in dates:
categorical_data = tuple(random.choice(string.ascii_lowercase) for _ in range(1))
numerical_data = tuple(random.randrange(100) for _ in range(20))
other_data.append(categorical_data + numerical_data + (d,))
categorical_columns = ['cat_{}'.format(n) for n in range(1)]
numerical_columns = ['num_{}'.format(n) for n in range(20)]
date_column = ['date']
columns = categorical_columns + numerical_columns + date_column
df = sc.parallelize(other_data).toDF(columns)
df = df.withColumn('date_window', func.window('date', '5 minutes'))
df.registerTempTable('df')
########## End DataFrame setup ##########
迄今为止,我尝试了两种技术:一种使用内置DataFrame.groupBy
机制;另一种使用内置机制。另一个使用pyspark.sql.window.Window
'sorderBy
和partitionBy
方法。
一般来说,我开发的管道如下所示:
- 对于每个数值列,按分类列
cat_0
和分组date_window
,并计算前面列出的五个汇总统计量。 - 对于该
pyspark.sql.window.Window
方法,直接加入计算列,使用df.withColumn
. 对于该DataFrame.groupBy
方法,跟踪每个结果数据帧(每个数据帧有三列:两列用于分组列,一列用于计算列) - 最后,通过基本上以分组列作为键执行归约操作来连接每个数据帧。
我在下面留下了一些管道代码,但主要对以下观点感兴趣:1)这些中的任何一个是否是此类工作的“最佳实践”,2)如果不是,我是否缺少 Spark 生态系统中的主要内容可以帮助我更快/用更少的资源做到这一点吗?
目前,该groupBy
方法的性能稍好一些,但有点麻烦,因为必须跟踪每个分组的 DataFrame 并在最后将它们全部归约。这种Window
方法并不是很好,尽管它在语法上更简洁,更易于维护,IMO。在任何一种情况下,我都必须分配大量计算才能让作业运行并在最后写入磁盘(无需重新分区/合并)。
gb_cols = ['cat_0', 'date_window']
strategies = {'sum', 'mean', 'stddev', 'max', 'min'}
Xcols = [col for col in df.columns if col.startswith('num')]
for col in Xcols[:]:
for s in strategies:
new_col = '{}_{}'.format(col, s)
Xcols.append(new_col)
if s == 'mean':
calc_col_series = func.mean(col)
elif s == 'stddev':
calc_col_series = func.stddev(col)
elif s == 'max':
calc_col_series = func.max(col)
elif s == 'min':
calc_col_series = func.min(col)
elif s == 'sum':
calc_col_series = func.sum(col)
elif s == 'median':
query = '''
SELECT
PERCENTILE_APPROX({}, 0.5)
FROM df
GROUP BY {}
'''.format(col, ','.join(gb_cols))
calc_col_series = spark.sql(query)
df = df.withColumn(new_col, calc_col_series.over(agg_window))
# Differencing inputs
for difference in range(1, 3 + 1):
# Last period's datapoints... moved to the future
led_series = func.lag(df[new_col], difference).over(agg_window.orderBy(window_cols['orderBy']))
diff_series = df[new_col] - led_series
new_col_diff = '{}_{}_diff'.format(new_col, difference)
df = df.withColumn(new_col_diff, diff_series)
Xcols.append(new_col_diff)
解决方案
推荐阅读
- javascript - HTML 表单中的 JSON 数据
- c - C segfault - off-by-one - 大小为 1 的无效写入
- javascript - 如何使用 html / angular / javascript 扩展表格
- php - Ajax Full Featured error 添加新事件
- google-apps-script - 试图追加行我没有掌握什么?
- reactjs - 集成时出现此错误错误:操作必须是普通对象。使用自定义中间件进行异步操作
- javascript - 如何使用整页 iframe 在页面上隐藏移动浏览器地址栏
- web - Power Query Web 抓取但没有表格显示
- python - 如何使用 Python 在控制台中打印图像
- reactjs - 如何处理 TS React 项目中的意外道具?