pyspark - 根据列中的唯一值对 PySpark DataFrame 进行分区(自定义分区)
问题描述
我有一个 PySpark 数据框,其中有用于名称、类型、日期和值的单独列。数据框的示例如下所示:
+------+----+---+-----+
| Name|Type|Day|Value|
+------+----+---+-----+
| name1| a| 1| 140|
| name2| a| 1| 180|
| name3| a| 1| 150|
| name4| b| 1| 145|
| name5| b| 1| 185|
| name6| c| 1| 155|
| name7| c| 1| 160|
| name8| a| 2| 120|
| name9| a| 2| 110|
|name10| b| 2| 125|
|name11| b| 2| 185|
|name12| c| 3| 195|
+------+----+---+-----+
对于 的选定值Type
,我想根据标题为 的列的唯一值创建单独的数据框Day
。比方说,我选择a
了作为我的首选Type
。在上述示例中,我有三个唯一值Day
(即. 1, 2 , 3
)。对于每个唯一值Day
都有一行带有所选Type
a
- (即天数1
和2
上述数据),我想创建一个数据框,其中所有行都带有所选的Type
和Day
。在上面提到的示例中,我将有两个数据框,如下所示
+------+----+---+-----+
| Name|Type|Day|Value|
+------+----+---+-----+
| name1| a| 1| 140|
| name2| a| 1| 180|
| name3| a| 1| 150|
+------+----+---+-----+
和
+------+----+---+-----+
| Name|Type|Day|Value|
+------+----+---+-----+
| name8| a| 2| 120|
| name9| a| 2| 110|
+------+----+---+-----+
我怎样才能做到这一点?在我将使用的实际数据中,我有数百万列。所以,我想知道实现上述目标的最有效方式。
您可以使用下面提到的代码来生成上面给出的示例。
from pyspark.sql import *
import numpy as np
Stats = Row("Name", "Type", "Day", "Value")
stat1 = Stats('name1', 'a', 1, 140)
stat2 = Stats('name2', 'a', 1, 180)
stat3 = Stats('name3', 'a', 1, 150)
stat4 = Stats('name4', 'b', 1, 145)
stat5 = Stats('name5', 'b', 1, 185)
stat6 = Stats('name6', 'c', 1, 155)
stat7 = Stats('name7', 'c', 1, 160)
stat8 = Stats('name8', 'a', 2, 120)
stat9 = Stats('name9', 'a', 2, 110)
stat10 = Stats('name10', 'b', 2, 125)
stat11 = Stats('name11', 'b', 2, 185)
stat12 = Stats('name12', 'c', 3, 195)
解决方案
你可以使用df.repartition("Type", "Day")
当我使用以下函数进行验证时,我得到了提到的输出
def validate(partition):
count = 0
for row in partition:
print(row)
count += 1
print(count)
我的数据
+------+--------------------+-------+-------+
|amount| trans_date|user_id|row_num|
+------+--------------------+-------+-------+
| 99.1|2019-06-04T00:00:...| 101| 1|
| 89.27|2019-06-04T00:00:...| 102| 2|
| 89.1|2019-03-04T00:00:...| 102| 3|
| 73.11|2019-09-10T00:00:...| 103| 4|
|-69.81|2019-09-11T00:00:...| 101| 5|
| 12.51|2018-12-14T00:00:...| 101| 6|
| 43.23|2018-09-11T00:00:...| 101| 7|
+------+--------------------+-------+-------+
在df.repartition("user_id")
我得到以下信息后:
输出
Row(amount=73.11, trans_date='2019-09-10T00:00:00.000+05:30', user_id='103', row_num=4)
1
Row(amount=89.27, trans_date='2019-06-04T00:00:00.000+05:30', user_id='102', row_num=2)
Row(amount=89.1, trans_date='2019-03-04T00:00:00.000+05:30', user_id='102', row_num=3)
2
Row(amount=99.1, trans_date='2019-06-04T00:00:00.000+05:30', user_id='101', row_num=1)
Row(amount=-69.81, trans_date='2019-09-11T00:00:00.000+05:30', user_id='101', row_num=5)
Row(amount=12.51, trans_date='2018-12-14T00:00:00.000+05:30', user_id='101', row_num=6)
Row(amount=43.23, trans_date='2018-09-11T00:00:00.000+05:30', user_id='101', row_num=7)
4
推荐阅读
- bash - 如何在没有美元符号的情况下复制和粘贴 Bash?
- python-3.x - 如何用字母 xtics 绘制在 python 中有更多值的图形?
- scala - 如何使用 scalaPB 在 scala 中生成代数数据类型
- python - 实现归并排序算法以升序对数字列表进行排序
- python - SLURM 中的并行计算并不总是在 Python 中的 sys.exit(0) 之后退出
- haskell - 理解应用于图形的简单函数
- sql - 在红移谱 sql 查询中排除列
- angular - Kendo UI 调度程序错误“this.intlService.weekendRange 不是函数”
- xgboost - XGBoost - 自定义损失函数
- arrays - 如果在此工作簿中找不到工作表,如何找到工作表,如果单元格为空,则移动到下一个单元格,如果找不到,则退出子