python - 将 Spark DataFrame 中的 NULL 值替换为 Dict 的键值
问题描述
我有这个dataframe
和一个键:值广播变量字典。我想根据数据框中名为“item”的不同列,用's 键值替换's 'value'null
列中的值,该列与 's 键相同。dataframe
dict
dict
如何才能做到这一点?
# mapping
dict = {'temp': '70.0', 'speed': '98', 'wind': 'TRUE'}
# sample data
df = spark.createDataFrame([('2019-05-10 7:30:05', 'device1', 'event', 'temp', None),\
('2019-05-10 7:30:05', 'device2', 'sensor', 'speed', None),\
('2019-05-10 7:30:05', 'device3', 'monitor', 'wind', None),\
('2019-05-10 7:30:10', 'device1', 'event', 'temp', '75.2'),\
('2019-05-10 7:30:10', 'device2', 'sensor', 'speed', '100'),\
('2019-05-10 7:30:10', 'device3', 'monitor', 'wind', 'FALSE'),],\
['date', 'name', 'type', 'item', 'value'])
# current input
+------------------+-------+-------+-----+-----+
| date| name| type| item|value|
+------------------+-------+-------+-----+-----+
|2019-05-10 7:30:05|device1| event| temp| null|
|2019-05-10 7:30:05|device2| sensor|speed| null|
|2019-05-10 7:30:05|device3|monitor| wind| null|
|2019-05-10 7:30:10|device1| event| temp| 75.2|
|2019-05-10 7:30:10|device2| sensor|speed| 100|
|2019-05-10 7:30:10|device3|monitor| wind|FALSE|
+------------------+-------+-------+-----+-----+
# desired output
+------------------+-------+-------+-----+-----+
| date| name| type| item|value|
+------------------+-------+-------+-----+-----+
|2019-05-10 7:30:05|device1| event| temp| 70.0|
|2019-05-10 7:30:05|device2| sensor|speed| 98|
|2019-05-10 7:30:05|device3|monitor| wind| TRUE|
|2019-05-10 7:30:10|device1| event| temp| 75.2|
|2019-05-10 7:30:10|device2| sensor|speed| 100|
|2019-05-10 7:30:10|device3|monitor| wind|FALSE|
+------------------+-------+-------+-----+-----+
解决方案
from pyspark.sql.functions import coalesce, lit, create_map, col
from itertools import chain
map_dict = create_map(*[ lit(e) for e in chain.from_iterable(dict.items()) ])
# Column<b'map(temp, 70.0, speed, 98, wind, TRUE)'>
df.withColumn('value', coalesce('value', map_dict[col('item')])).show()
#+------------------+-------+-------+-----+-----+
#| date| name| type| item|value|
#+------------------+-------+-------+-----+-----+
#|2019-05-10 7:30:05|device1| event| temp| 70.0|
#|2019-05-10 7:30:05|device2| sensor|speed| 98|
#|2019-05-10 7:30:05|device3|monitor| wind| TRUE|
#|2019-05-10 7:30:10|device1| event| temp| 75.2|
#|2019-05-10 7:30:10|device2| sensor|speed| 100|
#|2019-05-10 7:30:10|device3|monitor| wind|FALSE|
#+------------------+-------+-------+-----+-----+
对于非常大的 dict 映射,您可以创建一个数据框并进行左连接:
from pyspark.sql.functions import coalesce, broadcast
df_map = spark.createDataFrame(dict.items(), ['item', 'map_value'])
df.join(broadcast(df_map), on=['item'], how='left') \
.withColumn('value', coalesce('value', 'map_value')) \
.drop('map_value') \
.show()
推荐阅读
- reactjs - 如何在本机反应中对 svg 图像进行灰度化?
- c++ - 基于窗口位置的 Direct X 裁剪位图并将其渲染回窗口
- postgresql - SQL 错误:错误:未处理所有令牌
- python - 无法在 TensorFlow 中完全分离模型的输出
- laravel - 'composer require jenssegers/mongodb' 将 MongoDB 添加到新的 Laravel 7 项目时出错
- amazon-web-services - 预定义组的 AWS S3 对象级 ACL
- typescript - 为什么在将字符串与空值进行比较时,TS 不会说“此条件将始终返回 'false'”?
- javascript - React/TS 在一种方法上显示 YYYY/MM/DD
- assembly - 下面列出的最后一行弹出了什么值?
- javascript - 如何在特定的快递路线上设置不同的 bodyparser