pyspark - 使用 Pyspark 中的转换减少多个操作/过滤器优化
问题描述
F1 = df.filter((df.Identifier1>0)).groupBy().avg('Amount')
F2 = df.filter((df.Identifier1>2)).groupBy().avg('Amount')
F3 = df.filter((df.Identifier2<2)).groupBy().avg('Amount')
F4 = df.filter((df.Identifier2<4)).groupBy().avg('Amount')
#Alternatively also tried another way for avg calculation,
F1 = df.filter((df.Identifier1>0)).agg(avg(col('Amount')))
..
计算这些平均值后,我尝试使用平均值计算中使用的相同过滤器将它们分配给主 df 中的记录到两列 A1 和 A2 中。
df = df.withColumn("A1", when((col("Identifier1") > 0)), (F1.collect()[0][0]))
….
….
.otherwise(avg(col('Amount')))
df = df.withColumn("A2", when((col("Identifier2") <2 )), (F3.collect()[0][0]))
….
….
.otherwise(avg(col('Amount')))
我面临两个问题:
当其中一个平均值为 Null 时,我在调用时遇到错误
collect() or first()
错误:
Unsupported literal type class java.util.ArrayList [null]
由于涉及多个操作,该过程需要 2 小时以上。
欢迎对上述内容提供任何帮助。
解决方案
为您的过滤条件创建一列,例如
+---+--------+-----------+-----------+------+
| ID|Category|Identifier1|Identifier2|Amount|
+---+--------+-----------+-----------+------+
| 12| A| 2| 1| 100|
| 23| B| 7| 8| 500|
| 34| C| 1| 4| 300|
+---+--------+-----------+-----------+------+
df.withColumn('group', when(df.Identifier1 > 0, array(lit(1))).otherwise(array(lit(None)))) \
.withColumn('group', when(df.Identifier1 > 2, array_union(col('group'), array(lit(2)))).otherwise(col('group'))) \
.withColumn('group', when(df.Identifier2 < 2, array_union(col('group'), array(lit(3)))).otherwise(col('group'))) \
.withColumn('group', when(df.Identifier2 < 4, array_union(col('group'), array(lit(4)))).otherwise(col('group'))) \
.withColumn('group', explode('group')) \
.groupBy('group').agg(sum('Amount').alias('sum'), avg('Amount').alias('avg')).show()
+-----+---+-----+
|group|sum| avg|
+-----+---+-----+
| 1|900|300.0|
| 3|100|100.0|
| 4|100|100.0|
| 2|500|500.0|
+-----+---+-----+
然后按组分组。
推荐阅读
- makefile - makefile:意外标记“,”附近的语法错误
- c# - How to create a C# application that opens "cmd.exe" in another window?
- python - 所有标记的seaborn散点图标记大小
- javascript - 如何将数据从一个 Marketo 实例提交到另一个?
- singularity-container - 奇点提取、编辑和重建图像
- node.js - ftpPublish 声明性管道
- android - 如何让安卓应用永不关闭?(模拟亭模式)
- javascript - 我的 javascript 模块中的函数未定义
- amazon-web-services - 将 Amazon WorkMail 与其他电子邮件提供商连接
- mysql - 带有 BLOB 列的 MySQL 5.7 批量插入