apache-spark - Pyspark 爆炸嵌套列表
问题描述
我有以下数据框,我想分解值列,以便每个值都在单独的列中:
id | values
-----------------------
1 | '[[532,969020406,89],[216,969100125,23],[169,39356140000,72],[399,14407358500,188],[377,13761937166.6667,24]]'
2 | '[[532,969020406,89]]'
请注意,值列下的列表可以有不同的长度,并且它们是字符串数据类型。
所需的表应如下所示:
id | v11 | v12 | v13 | v21 | v22...
--------------------------------------
1 | 532 | 969020406 | 89 | 216 | 969100125...
2 | 532 | 969020406 | 89 | Null | Null...
我试图指定架构并使用 from_json 方法创建数组然后分解它,但我遇到了问题,即任何架构似乎都不适合我的数据
json_schema = types.StructType([types.StructField('array', types.StructType([ \
types.StructField("v1",types.StringType(),True), \
types.StructField("v2",types.StringType(),True), \
types.StructField("v3",types.StringType(),True)
]))])
json_schema = types.ArrayType(types.StructType([ \
types.StructField("v1",types.StringType(),True), \
types.StructField("v2",types.StringType(),True), \
types.StructField("v3",types.StringType(),True)
]))
json_schema = types.ArrayType(types.ArrayType(types.IntegerType()))
df.select('id', F.from_json('values', schema=json_schema)).show()
该过程仅返回 Null 值或空数组:[,,]
我还收到以下错误:StructType can not accept object '[' in type <class 'str'>
Pyspark 推断的输入数据的架构:
root
|-- id: integer (nullable = true)
|-- values: string (nullable = true)
任何帮助,将不胜感激。
解决方案
对于 Spark 2.4+,您可以结合使用split和transform将字符串转换为二维数组。然后可以将该数组的单个条目分别转换为列。
from pyspark.sql import functions as F
df2 = df.withColumn("parsed_values", F.expr("transform(split(values, '\\\\],\\\\['), " +
"c -> transform(split(c, ','), d->regexp_replace(d,'[\\\\[\\\\]]','')))"))\
.withColumn("length", F.size("parsed_values"))
max_length = df2.agg(F.max("length")).head()["max(length)"]
df2
现在有结构
root
|-- id: string (nullable = true)
|-- values: string (nullable = true)
|-- parsed_values: array (nullable = true)
| |-- element: array (containsNull = true)
| | |-- element: string (containsNull = true)
|-- length: integer (nullable = false)
并且max_length
包含一行中的最大条目数(示例数据为 5)。
parsed_value[0][1]
将返回第一个条目的第二个子条目。这将969020406
用于示例数据。
第二步是将嵌套数组转换为列。
cols = [F.col('parsed_values').getItem(x).getItem(y).alias("v{}{}".format(x+1,y+1)) \
for x in range(0, max_length) for y in range(0,3)]
df2.select([F.col('id')] + cols).show()
输出:
+---+---+---------+---+----+---------+----+----+-----------+----+----+-----------+----+----+----------------+----+
| id|v11| v12|v13| v21| v22| v23| v31| v32| v33| v41| v42| v43| v51| v52| v53|
+---+---+---------+---+----+---------+----+----+-----------+----+----+-----------+----+----+----------------+----+
| 1|532|969020406| 89| 216|969100125| 23| 169|39356140000| 72| 399|14407358500| 188| 377|13761937166.6667| 24|
| 2|532|969020406| 89|null| null|null|null| null|null|null| null|null|null| null|null|
+---+---+---------+---+----+---------+----+----+-----------+----+----+-----------+----+----+----------------+----+
max_length
如果有一种方法可以确定而不必找到完整数据的最大值,例如如果事先知道该值,则可以改进解决方案。
推荐阅读
- javascript - Vue 与 VCalendar 组件:保持数据与另一个 Vue 实例同步
- php - Symfony 2.8 - Session 和 Twig 的问题
- angular - 是否可以在 Angular 7 中找到本地网络设备
- .net - 测试 .Net Framework 解决方案结构的最佳工具/框架/方法
- html - 当有多个 tbodies 时,Safari 计算错误的顶部位置,表格中的位置为粘性
- python - AttributeError:'NoneType'对象在python中没有属性'lower'
- c# - C# 为 WPF 实现缓存
- javascript - 将 React Hooks 与 Electron JS 和 react-data-grid 一起使用时出错
- android - 在创建位图时抛出 OutOfMemoryError
- wso2 - 在单个主机中启动多个 wso2 APIM 配置文件