python - PySpark 2.2 爆炸删除空行(如何实现explode_outer)?
问题描述
我正在处理 PySpark 数据框中的一些深度嵌套的数据。当我试图将结构展平为行和列时,我注意到当我调用时,withColumn
如果该行包含null
在源列中,那么该行将从我的结果数据框中删除。相反,我想找到一种方法来保留该行并null
在结果列中。
要使用的示例数据框:
from pyspark.sql.functions import explode, first, col, monotonically_increasing_id
from pyspark.sql import Row
df = spark.createDataFrame([
Row(dataCells=[Row(posx=0, posy=1, posz=.5, value=1.5, shape=[Row(_type='square', _len=1)]),
Row(posx=1, posy=3, posz=.5, value=4.5, shape=[]),
Row(posx=2, posy=5, posz=.5, value=7.5, shape=[Row(_type='circle', _len=.5)])
])
])
我还有一个用于展平结构的函数:
def flatten_struct_cols(df):
flat_cols = [column[0] for column in df.dtypes if 'struct' not in column[1][:6]]
struct_columns = [column[0] for column in df.dtypes if 'struct' in column[1][:6]]
df = df.select(flat_cols +
[col(sc + '.' + c).alias(sc + '_' + c)
for sc in struct_columns
for c in df.select(sc + '.*').columns])
return df
架构如下所示:
df.printSchema()
root
|-- dataCells: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- posx: long (nullable = true)
| | |-- posy: long (nullable = true)
| | |-- posz: double (nullable = true)
| | |-- shape: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- _len: long (nullable = true)
| | | | |-- _type: string (nullable = true)
| | |-- value: double (nullable = true)
起始数据框:
df.show(3)
+--------------------+
| dataCells|
+--------------------+
|[[0,1,0.5,Wrapped...|
+--------------------+
我首先分解数组,因为我想将这个结构数组与结构数组转换为行和列。然后我将结构字段展平为新列。
df = df.withColumn('dataCells', explode(col('dataCells')))
df = flatten_struct_cols(df)
df.show(3)
我的数据看起来像:
+--------------+--------------+--------------+---------------+---------------+
|dataCells_posx|dataCells_posy|dataCells_posz|dataCells_shape|dataCells_value|
+--------------+--------------+--------------+---------------+---------------+
| 0| 1| 0.5| [[1,square]]| 1.5|
| 1| 3| 0.5| []| 4.5|
| 2| 5| 0.5|[[null,circle]]| 7.5|
+--------------+--------------+--------------+---------------+---------------+
一切都很好,正如预期的那样,直到我尝试具有空/空值explode
的列。dataCells_shape
df = df.withColumn('dataCells_shape', explode(col('dataCells_shape')))
df.show(3)
这会将第二行从数据框中删除:
+--------------+--------------+--------------+---------------+---------------+
|dataCells_posx|dataCells_posy|dataCells_posz|dataCells_shape|dataCells_value|
+--------------+--------------+--------------+---------------+---------------+
| 0| 1| 0.5| [1,square]| 1.5|
| 2| 5| 0.5| [null,circle]| 7.5|
+--------------+--------------+--------------+---------------+---------------+
相反,我想保留该行并保留该列的空值以及其他列中的所有值。我尝试创建一个新列而不是在执行时覆盖旧列,.withColumn
explode
并且无论哪种方式都得到相同的结果。
如果行不为空/空,我还尝试创建一个UDF
执行该explode
函数的函数,但我遇到了 JVM 错误处理null
。
from pyspark.sql.functions import udf
from pyspark.sql.types import NullType, StructType
def explode_if_not_null(trow):
if trow:
return explode(trow)
else:
return NullType
func_udf = udf(explode_if_not_null, StructType())
df = df.withColumn('dataCells_shape_test', func_udf(df['dataCells_shape']))
df.show(3)
AttributeError: 'NoneType' object has no attribute '_jvm'
任何人都可以建议我在ArrayType
不丢失行的情况下爆炸或展平列的方法null
吗?
我正在使用 PySpark 2.2.0
编辑:
在作为可能的欺骗提供的链接之后,我尝试实施.isNotNull().otherwise()
提供结构模式的建议解决方案,.otherwise
但该行仍然从结果集中退出。
df.withColumn("dataCells_shape_test", explode(when(col("dataCells_shape").isNotNull(), col("dataCells_shape"))
.otherwise(array(lit(None).cast(df.select(col("dataCells_shape").getItem(0))
.dtypes[0][1])
)
)
)
).show()
+--------------+--------------+--------------+---------------+---------------+--------------------+
|dataCells_posx|dataCells_posy|dataCells_posz|dataCells_shape|dataCells_value|dataCells_shape_test|
+--------------+--------------+--------------+---------------+---------------+--------------------+
| 0| 1| 0.5| [[1,square]]| 1.5| [1,square]|
| 2| 5| 0.5|[[null,circle]]| 7.5| [null,circle]|
+--------------+--------------+--------------+---------------+---------------+--------------------+
解决方案
感谢pault向我指出这个问题和这个关于将 Python 映射到 Java 的问题。我能够通过以下方式获得有效的解决方案:
from pyspark.sql.column import Column, _to_java_column
def explode_outer(col):
_explode_outer = sc._jvm.org.apache.spark.sql.functions.explode_outer
return Column(_explode_outer(_to_java_column(col)))
new_df = df.withColumn("dataCells_shape", explode_outer(col("dataCells_shape")))
+--------------+--------------+--------------+---------------+---------------+
|dataCells_posx|dataCells_posy|dataCells_posz|dataCells_shape|dataCells_value|
+--------------+--------------+--------------+---------------+---------------+
| 0| 1| 0.5| [1,square]| 1.5|
| 1| 3| 0.5| null| 4.5|
| 2| 5| 0.5| [null,circle]| 7.5|
+--------------+--------------+--------------+---------------+---------------+
root
|-- dataCells_posx: long (nullable = true)
|-- dataCells_posy: long (nullable = true)
|-- dataCells_posz: double (nullable = true)
|-- dataCells_shape: struct (nullable = true)
| |-- _len: long (nullable = true)
| |-- _type: string (nullable = true)
|-- dataCells_value: double (nullable = true)
需要注意的是,这适用于 pyspark 2.2 版,因为explode_outer
它是在 spark 2.2 中定义的(但由于某种原因,API 包装器直到 2.3 版才在 pyspark 中实现)。该解决方案为已经实现的 java 函数创建了一个包装器。
推荐阅读
- javascript - Go 和 Node.js base64 编码、解码问题
- airflow - 我杀死了 Airflow 任务,但它的 BashOperator 创建的子进程尚未被杀死
- swift - 如何从明天开始每天重复本地通知
- haskell - 有没有办法访问 Wai Web App 的 cookie 标头?
- oracle - 如何连接到我 PC 上的默认 Oracle10g 实例?
- angular - 输入上的 Angular 6 PipeTransform 在 IE11 上向左跳克拉
- dart - 颤振/飞镖在桌面上作为原生应用程序工作吗?
- laravel - 如何在laravel中传递月份值并获取月份的开始和结束日期
- matlab - 如何在matlab中填充图像之间的空白空间以进行体积成像/ 3D重建
- mysql - 通过 Ruby 问题将 PC 数据记录到数据库中