首页 > 解决方案 > PySpark Compare Empty Map Literal

问题描述

I want to drop rows in a PySpark DataFrame where a certain column contains an empty map. How do I do this? I can't seem to declare a typed empty MapType against which to compare my column. I have seen that in Scala, you can use typedLit, but there seems to be no such equivalent in PySpark. I have also tried using lit(...) and casting to a struct<string,int> but I have found no acceptable argument for lit() (tried using None which returns null and {} which is an error).

I'm sure this is trivial but I haven't seen any docs on this!

标签: apache-sparkpysparkapache-spark-sqlpyspark-sql

解决方案


这是使用 pysparksize内置函数的解决方案:

from pyspark.sql.functions import col, size

df = spark.createDataFrame(
  [(1, {1:'A'} ),
  (2, {2:'B'} ), 
  (3, {3:'C'} ),
  (4, {}),
  (5, None)]
).toDF("id", "map")

df.printSchema()
# root
#  |-- id: long (nullable = true)
#  |-- map: map (nullable = true)
#  |    |-- key: long
#  |    |-- value: string (valueContainsNull = true)

df.withColumn("is_empty", size(col("map")) <= 0).show()

# +---+--------+--------+
# | id|     map|is_empty|
# +---+--------+--------+
# |  1|[1 -> A]|   false|
# |  2|[2 -> B]|   false|
# |  3|[3 -> C]|   false|
# |  4|      []|    true|
# |  5|    null|    true|
# +---+--------+--------+

请注意,条件是size <= 0因为在 null 的情况下,函数返回 -1(如果spark.sql.legacy.sizeOfNull设置为 true,否则将返回 null)。在这里您可以找到更多详细信息。

通用解决方案:比较 Map 列和文字 Map

对于更通用的解决方案,我们可以将内置函数size与 UDF 结合使用,将key + value每个项目的字符串附加到排序列表中(感谢 @jxc 指出以前版本的问题)。这里的假设是两个映射在以下情况下相等:

  1. 他们有相同的大小
  2. 键 + 值的字符串表示在地图的项目之间是相同的

文字映射是从任意 python 字典中创建的,通过以下方式组合键和值map_from_arrays

from pyspark.sql.functions import udf, lit, size, when, map_from_arrays, array

df = spark.createDataFrame([
   [1, {}],
   [2, {1:'A', 2:'B', 3:'C'}],
   [3, {1:'A', 2:'B'}]
  ]).toDF("key", "map")

dict = { 1:'A' , 2:'B' }

map_keys_ = array([lit(k) for k in dict.keys()])
map_values_ = array([lit(v) for v in dict.values()])
tmp_map = map_from_arrays(map_keys_, map_values_) 

to_strlist_udf = udf(lambda d: sorted([str(k) + str(d[k]) for k in d.keys()]))

def map_equals(m1, m2):
  return when(
            (size(m1) == size(m2)) & 
            (to_strlist_udf(m1) == to_strlist_udf(m2)), True
          ).otherwise(False)

df = df.withColumn("equals", map_equals(df["map"], tmp_map))

df.show(10, False)

# +---+------------------------+------+
# |key|map                     |equals|
# +---+------------------------+------+
# |1  |[]                      |false |
# |2  |[1 -> A, 2 -> B, 3 -> C]|false |
# |3  |[1 -> A, 2 -> B]        |true  |
# +---+------------------------+------+

注意:正如您所见,pyspark==运算符也可以很好地用于数组比较。


推荐阅读