apache-spark - PySpark Compare Empty Map Literal
问题描述
I want to drop rows in a PySpark DataFrame where a certain column contains an empty map. How do I do this? I can't seem to declare a typed empty MapType against which to compare my column. I have seen that in Scala, you can use typedLit
, but there seems to be no such equivalent in PySpark. I have also tried using lit(...)
and casting to a struct<string,int>
but I have found no acceptable argument for lit()
(tried using None
which returns null and {}
which is an error).
I'm sure this is trivial but I haven't seen any docs on this!
解决方案
这是使用 pysparksize
内置函数的解决方案:
from pyspark.sql.functions import col, size
df = spark.createDataFrame(
[(1, {1:'A'} ),
(2, {2:'B'} ),
(3, {3:'C'} ),
(4, {}),
(5, None)]
).toDF("id", "map")
df.printSchema()
# root
# |-- id: long (nullable = true)
# |-- map: map (nullable = true)
# | |-- key: long
# | |-- value: string (valueContainsNull = true)
df.withColumn("is_empty", size(col("map")) <= 0).show()
# +---+--------+--------+
# | id| map|is_empty|
# +---+--------+--------+
# | 1|[1 -> A]| false|
# | 2|[2 -> B]| false|
# | 3|[3 -> C]| false|
# | 4| []| true|
# | 5| null| true|
# +---+--------+--------+
请注意,条件是size <= 0
因为在 null 的情况下,函数返回 -1(如果spark.sql.legacy.sizeOfNull
设置为 true,否则将返回 null)。在这里您可以找到更多详细信息。
通用解决方案:比较 Map 列和文字 Map
对于更通用的解决方案,我们可以将内置函数size
与 UDF 结合使用,将key + value
每个项目的字符串附加到排序列表中(感谢 @jxc 指出以前版本的问题)。这里的假设是两个映射在以下情况下相等:
- 他们有相同的大小
- 键 + 值的字符串表示在地图的项目之间是相同的
文字映射是从任意 python 字典中创建的,通过以下方式组合键和值map_from_arrays
:
from pyspark.sql.functions import udf, lit, size, when, map_from_arrays, array
df = spark.createDataFrame([
[1, {}],
[2, {1:'A', 2:'B', 3:'C'}],
[3, {1:'A', 2:'B'}]
]).toDF("key", "map")
dict = { 1:'A' , 2:'B' }
map_keys_ = array([lit(k) for k in dict.keys()])
map_values_ = array([lit(v) for v in dict.values()])
tmp_map = map_from_arrays(map_keys_, map_values_)
to_strlist_udf = udf(lambda d: sorted([str(k) + str(d[k]) for k in d.keys()]))
def map_equals(m1, m2):
return when(
(size(m1) == size(m2)) &
(to_strlist_udf(m1) == to_strlist_udf(m2)), True
).otherwise(False)
df = df.withColumn("equals", map_equals(df["map"], tmp_map))
df.show(10, False)
# +---+------------------------+------+
# |key|map |equals|
# +---+------------------------+------+
# |1 |[] |false |
# |2 |[1 -> A, 2 -> B, 3 -> C]|false |
# |3 |[1 -> A, 2 -> B] |true |
# +---+------------------------+------+
注意:正如您所见,pyspark==
运算符也可以很好地用于数组比较。
推荐阅读
- c++ - C++:如何优化标准布局类模板中的空数据成员?
- logging - 在 Gunicorn 中记录所有请求和响应标头
- java - Java(SQL Like)和Shell脚本运行2个参数的问题
- python - 打开另一帧后如何在第一帧运行程序?
- powershell - Get-ADGroup "myADgroup" -Properties myProperty - 解析输出并仅获取 CN
- asp.net-core - 如何检查 asp.net 核心方法?
- asp.net-core - 使用 ASP.NET Core 设置 azure-pipelines.yml “存储库中未找到 Web 项目”
- python - Pandas 面试问题 - 比较 Pandas-Joins 并理想地提供最快的方法
- python - 我怎么能这样写for语句:因为我不在x中
- angular - FileReader.readystate 卡在 1。Onload 未触发