python - PySpark 将字符串化的字典数组分解为行
问题描述
我有一个带有 StringType 列 ( ) 的 pyspark 数据框edges
,其中包含一个字典列表(参见下面的示例)。字典包含混合的值类型,包括另一个字典 ( nodeIDs
)。我需要将字段中的顶级字典edges
分解成行;理想情况下,我应该能够将它们的组件值转换为单独的字段。
输入:
import findspark
findspark.init()
SPARK = SparkSession.builder.enableHiveSupport() \
.getOrCreate()
data = [
Row(trace_uuid='aaaa', timestamp='2019-05-20T10:36:33+02:00', edges='[{"distance":4.382441320292239,"duration":1.5,"speed":2.9,"nodeIDs":{"nodeA":954752475,"nodeB":1665827480}},{"distance":14.48582171131768,"duration":2.6,"speed":5.6,"nodeIDs":{"nodeA":1665827480,"nodeB":3559056131}}]', count=156, level=36),
Row(trace_uuid='bbbb', timestamp='2019-05-20T11:36:10+03:00', edges='[{"distance":0,"duration":0,"speed":0,"nodeIDs":{"nodeA":520686131,"nodeB":520686216}},{"distance":8.654358326561642,"duration":3.1,"speed":2.8,"nodeIDs":{"nodeA":520686216,"nodeB":506361795}}]', count=179, level=258)
]
df = SPARK.createDataFrame(data)
期望的输出:
data_reshaped = [
Row(trace_uuid='aaaa', timestamp='2019-05-20T10=36=33+02=00', distance=4.382441320292239, duration=1.5, speed=2.9, nodeA=954752475, nodeB=1665827480, count=156, level=36),
Row(trace_uuid='aaaa', timestamp='2019-05-20T10=36=33+02=00', distance=16.134844841712574, duration=2.9,speed=5.6, nodeA=1665827480, nodeB=3559056131, count=156, level=36),
Row(trace_uuid='bbbb', timestamp='2019-05-20T11=36=10+03=00', distance=0, duration=0, speed=0, nodeA=520686131, nodeB=520686216, count=179, level=258),
Row(trace_uuid='bbbb', timestamp='2019-05-20T11=36=10+03=00', distance=8.654358326561642, duration=3.1, speed=2.8, nodeA=520686216, nodeB=506361795, count=179, level=258)
]
有没有办法做到这一点?我已经尝试使用先将字段cast
转换edges
为数组,但我无法弄清楚如何让它与混合数据类型一起使用。
我正在使用 Spark 2.4.0。
解决方案
您可以使用from_json()和schema_of_json()来推断 JSON 模式。例如:
from pyspark.sql import functions as F
# a sample json string:
edges_json_sample = data[0].edges
# or edges_json_sample = df.select('edges').first()[0]
>>> edges_json_sample
#'[{"distance":4.382441320292239,"duration":1.5,"speed":2.9,"nodeIDs":{"nodeA":954752475,"nodeB":1665827480}},{"distance":14.48582171131768,"duration":2.6,"speed":5.6,"nodeIDs":{"nodeA":1665827480,"nodeB":3559056131}}]'
# infer schema from the sample string
schema = df.select(F.schema_of_json(edges_json_sample)).first()[0]
>>> schema
#u'array<struct<distance:double,duration:double,nodeIDs:struct<nodeA:bigint,nodeB:bigint>,speed:double>>'
# convert json string to data structure and then retrieve desired items
new_df = df.withColumn('data', F.explode(F.from_json('edges', schema))) \
.select('*', 'data.*', 'data.nodeIDs.*') \
.drop('data', 'nodeIDs', 'edges')
>>> new_df.show()
+-----+-----+--------------------+----------+-----------------+--------+-----+----------+----------+
|count|level| timestamp|trace_uuid| distance|duration|speed| nodeA| nodeB|
+-----+-----+--------------------+----------+-----------------+--------+-----+----------+----------+
| 156| 36|2019-05-20T10:36:...| aaaa|4.382441320292239| 1.5| 2.9| 954752475|1665827480|
| 156| 36|2019-05-20T10:36:...| aaaa|14.48582171131768| 2.6| 5.6|1665827480|3559056131|
| 179| 258|2019-05-20T11:36:...| bbbb| 0.0| 0.0| 0.0| 520686131| 520686216|
| 179| 258|2019-05-20T11:36:...| bbbb|8.654358326561642| 3.1| 2.8| 520686216| 506361795|
+-----+-----+--------------------+----------+-----------------+--------+-----+----------+----------+
# expected result
data_reshaped = new_df.rdd.collect()
推荐阅读
- winforms - 表单按钮显示在模拟器中但不显示在智能设备中
- perl - 如何读取持续更新的日志文件并匹配 perl 脚本中的特定模式
- excel - 我想合并一些txt文件
- git - 如何以不同的方式构建但使用相同的 package.json?
- c# - 如何使用 C# 在位图上绘制单色文本
- javascript - 使用 javascript 正则表达式删除多个反斜杠,同时保留 \n 特殊字符
- oracle - 如何知道我的会话中可以使用哪些模式?
- asp.net-mvc - 使用 Webmail 功能发送电子邮件时出错
- arrays - 如何在Ruby中的每个第n个字符之后有效地分割一个很长(数百万个字符)的字符串?
- time-series - Warpscript:操作计数超过最大值 100000