python - 有没有办法将类型字典的列添加到 pyspark 中的火花数据框?
问题描述
这就是我创建具有原始数据类型的数据框的方式pyspark
:
from pyspark.sql.types import StructType, StructField, DoubleType, StringType, IntegerType
fields = [StructField('column1', IntegerType(), True), StructField('column2', IntegerType(), True)]
schema = StructType(fields)
df = spark.createDataFrame([], schema)
values = [tuple([i]) +
tuple([i])
for i in range(3)]
df = spark.createDataFrame(values, schema)
现在,如果我想要第三列包含字典数据,例如:{"1": 1.0, "2": 2.0, "3": 3.0},我该怎么办?我想创建这个数据框:
+--------------------+-----------------+------------------------------+
|column1 |column2 |column3 |
+--------------------+-----------------+------------------------------+
|1 |1 |{"1": 1.0, "2": 1.0, "3": 1.0}|
+--------------------+-----------------+------------------------------+
|2 |2 |{"1": 2.0, "2": 2.0, "3": 2.0}|
+--------------------+-----------------+------------------------------+
|3 |3 |{"1": 3.0, "2": 3.0, "3": 3.0}|
+--------------------+-----------------+------------------------------+
有一个 MapType 似乎很有帮助,但我不知道如何使用它?
并假设创建了数据框,如何根据第三列过滤它,给定一个 dict 来选择具有该 dict 值的数据框的行?
解决方案
示例如何创建:
from pyspark.sql.types import MapType, IntegerType, DoubleType, StringType, StructType, StructField
import pyspark.sql.functions as f
schema = StructType([
StructField('column1', IntegerType()),
StructField('column2', IntegerType()),
StructField('column3', MapType(StringType(), DoubleType()))])
data = [(1, 2, {'a':3.5, 'b':4.2}), (4, 8, {'b':3.7, 'e':4.9})]
df = spark.createDataFrame(data, schema=schema)
df.show()
输出:
+-------+-------+--------------------+
|column1|column2| column3|
+-------+-------+--------------------+
| 1| 2|[a -> 3.5, b -> 4.2]|
| 4| 8|[e -> 4.9, b -> 3.7]|
+-------+-------+--------------------+
关于如何过滤 DataFrame 只留下具有特定键的元素的示例(假设您在地图中没有空值并且您的 Spark 版本是 2.4+,因为早期版本没有element_at
):
filtered_df = df.where(f.element_at(df.column3, 'a').isNotNull())
输出:
+-------+-------+--------------------+
|column1|column2| column3|
+-------+-------+--------------------+
| 1| 2|[a -> 3.5, b -> 4.2]|
+-------+-------+--------------------+
我可能误解了你的问题 - 如果你的意图是只留下地图列等于你拥有的特定字典的行,那就有点棘手了。据我所知,Spark 没有对字典类型进行比较操作(这有点不寻常的操作)。有一种方法可以使用 udf 来实现它,但效率不是很高。其代码可能如下所示:
from pyspark.sql.types import MapType, IntegerType, DoubleType, StringType, StructType, StructField, BooleanType
my_dict = {'b':2.7, 'e':4.9}
from pyspark.sql.functions import udf
def map_equality_comparer(my_dict):
@udf(BooleanType())
def comparer(m):
if len(m) != len(my_dict): return False
for k, v in m.items():
if my_dict.get(k) != v: return False
return True
return comparer
filtered_df = df.where(map_equality_comparer(my_dict)(df.column3))
filtered_df.show()
如果这对您来说太慢了,您可以考虑创建字典的规范表示并进行比较(例如,将字典转换为键值对的排序数组并基于这些数组的相等性进行过滤)。
推荐阅读
- java - 用 Java 下载几个文件后,下载速度显着减慢
- python - 使用python下载pdf
- c - 如何在 C 中运行计时器
- javascript - 在 MongoDB 中将 _id 字段设置为 ObjectId,然后再将其写入 DB
- python - 在python中通过命令行强制用户输入
- python - GCD 分数代码错误且未打印出正确答案
- wpf - 如何将参数从构造函数参数传递给WPF中的viewmodel构造函数
- xaml - 我怎样才能使控件仅在绑定值不是空格时才可见?
- angular - 从 NgRx 中的 RESTful API “渐进式”加载实体
- linux - Linux Shell 脚本已执行但未返回命令提示符