首页 > 解决方案 > 将 Spark DataFrame 中的 NULL 值替换为 Dict 的键值

问题描述

我有这个dataframe和一个键:值广播变量字典。我想根据数据框中名为“item”的不同列,用's 键值替换's 'value'null列中的值,该列与 's 键相同。dataframedictdict

如何才能做到这一点?

# mapping
dict = {'temp': '70.0', 'speed': '98', 'wind': 'TRUE'}

# sample data
df = spark.createDataFrame([('2019-05-10 7:30:05', 'device1', 'event', 'temp', None),\
                            ('2019-05-10 7:30:05', 'device2', 'sensor', 'speed', None),\
                            ('2019-05-10 7:30:05', 'device3', 'monitor', 'wind', None),\
                            ('2019-05-10 7:30:10', 'device1', 'event', 'temp', '75.2'),\
                            ('2019-05-10 7:30:10', 'device2', 'sensor', 'speed', '100'),\
                            ('2019-05-10 7:30:10', 'device3', 'monitor', 'wind', 'FALSE'),],\
                            ['date', 'name', 'type', 'item', 'value'])

# current input
+------------------+-------+-------+-----+-----+
|              date|   name|   type| item|value|
+------------------+-------+-------+-----+-----+
|2019-05-10 7:30:05|device1|  event| temp| null|
|2019-05-10 7:30:05|device2| sensor|speed| null|
|2019-05-10 7:30:05|device3|monitor| wind| null|
|2019-05-10 7:30:10|device1|  event| temp| 75.2|
|2019-05-10 7:30:10|device2| sensor|speed|  100|
|2019-05-10 7:30:10|device3|monitor| wind|FALSE|
+------------------+-------+-------+-----+-----+

# desired output
+------------------+-------+-------+-----+-----+
|              date|   name|   type| item|value|
+------------------+-------+-------+-----+-----+
|2019-05-10 7:30:05|device1|  event| temp| 70.0|
|2019-05-10 7:30:05|device2| sensor|speed|   98|
|2019-05-10 7:30:05|device3|monitor| wind| TRUE|
|2019-05-10 7:30:10|device1|  event| temp| 75.2|
|2019-05-10 7:30:10|device2| sensor|speed|  100|
|2019-05-10 7:30:10|device3|monitor| wind|FALSE|
+------------------+-------+-------+-----+-----+

标签: pythonapache-sparkpyspark

解决方案


使用coalescecreate_map

from pyspark.sql.functions import coalesce, lit, create_map, col
from itertools import chain 

map_dict = create_map(*[ lit(e) for e in chain.from_iterable(dict.items()) ])
# Column<b'map(temp, 70.0, speed, 98, wind, TRUE)'>

df.withColumn('value', coalesce('value', map_dict[col('item')])).show()
#+------------------+-------+-------+-----+-----+
#|              date|   name|   type| item|value|
#+------------------+-------+-------+-----+-----+
#|2019-05-10 7:30:05|device1|  event| temp| 70.0|
#|2019-05-10 7:30:05|device2| sensor|speed|   98|
#|2019-05-10 7:30:05|device3|monitor| wind| TRUE|
#|2019-05-10 7:30:10|device1|  event| temp| 75.2|
#|2019-05-10 7:30:10|device2| sensor|speed|  100|
#|2019-05-10 7:30:10|device3|monitor| wind|FALSE|
#+------------------+-------+-------+-----+-----+

对于非常大的 dict 映射,您可以创建一个数据框并进行左连接:

from pyspark.sql.functions import coalesce, broadcast

df_map = spark.createDataFrame(dict.items(), ['item', 'map_value'])

df.join(broadcast(df_map), on=['item'], how='left') \
  .withColumn('value', coalesce('value', 'map_value')) \
  .drop('map_value') \
  .show()

推荐阅读