python - 从 Spark / PySpark 中不一致数组的数据帧列中比较和删除元素
问题描述
我是 Spark 的新手,我找不到我的问题的解决方案,非常感谢任何建议或帮助。
我有一个 Pyspark.sql.dataframe,其中包含两个数组列,其中包含字符串。两个列数组的长度不一致,并且某些行也有 Null 条目。我需要比较这两列,并且必须为 B 列中的每一行删除数组的一个元素,当它在列 OVERRIDE 的数组中的该行中找到时。
+---------------+---------------+
| OVERRIDE | B |
+---------------+---------------+
| ['a']| ['a','b']|
| null| ['b']|
| null| ['a','c']|
| ['d','g']| ['d','g']|
| null| null|
| ['f']| ['f']|
+---------------+---------------+
最后应该是这样的:
+---------------+---------------+
| OVERRIDE | B |
+---------------+---------------+
| ['a']| ['b']|
| null| ['b']|
| null| ['a','c']|
| ['d','g']| null|
| null| null|
| ['f']| null|
+---------------+---------------+
我试过了
from pyspark.sql.functions import array_remove, array_intersect
df = df.withColumn('B', array_remove(df.B, df.OVERRIDE))
并且
df = df.withColumn('B', array_remove(df.B, array_intersect(df.OVERRIDE, df.B)))
但了解到 array_remove() 不能迭代列,而是只能取一个元素(例如'a')将其删除,然后在 B 列的所有行中。
我是否必须构建一个 udf 函数,如果是,我应该怎么做?
解决方案
你可以这样做udf
@udf(returnType=ArrayType(StringType()))
def removeFromRight(override,b):
if(override==None or b==None):
return b
filtered_list=[x for x in b if x not in override]
if(len(filtered_list)==0):
filtered_list=None
return filtered_list
test1=test.withColumn("new_overridden_col",removeFromRight(col("override"),col("b")))
test1.show()
//output of test1
+--------+------+------------------+
|override| b|new_overridden_col|
+--------+------+------------------+
| [a]|[a, b]| [b]|
| null| [b]| [b]|
| null|[a, c]| [a, c]|
| [d, g]| null| null|
| null| null| null|
| [f]| null| null|
+--------+------+------------------+
推荐阅读
- python - 使用 mayavi 绘制 3D 场景时出现意外行为
- java - java中的电子邮件验证,删除@localhost.com,user@9.8.7.6之类的电子邮件
- node.js - 如何正确地将更改推送到 Heroku
- linux - 如何使用 bash 脚本监控 CPU 使用率并随着时间的推移而上升?
- python - 在 python 中创建 json 对象的替代方法是什么?
- sql - 我想使用 SQL SERVER 在 SQL 中获取 while 循环的最终结果(参考代码)
- c - 如何在 Windows 中打印 \n?
- mongodb - 编辑 mongo.conf 后 MongoDB 状态失败(代码退出,状态 = 2)
- akka - 为什么 Akka 远程流量通过 UDP?
- angular - Angular - 大型查询的 Http 请求返回 null