apache-spark - 在 pyspark 中处理复杂对象(数组)
问题描述
我试图找出在 pyspark 中处理复杂对象的可能方法。在下面的示例中,数据框的一列是整数数组。处理只是将每个值加一。这些是可接受的方法还是有更好的做法?
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
spark = SparkSession.builder.enableHiveSupport().appName('learn').getOrCreate()
data = [('a', 1, [1, 3, 5]),
('b', 2, [4, 6, 9]),
('c', 3, [50, 60, 70, 80])]
df = spark.createDataFrame(data, ['nam', 'q', 'compl'])
# process complex object, method 1 using explode and collect_list (dataframe API)
res = df.withColumn('id', f.monotonically_increasing_id()).withColumn('compl_exploded', f.explode(f.col('compl')))
res = res.withColumn('compl_exploded', f.col('compl_exploded')+1)
res = res.groupby('id').agg(f.first('nam'), f.first('q'), f.collect_list('compl_exploded').alias('compl')).drop('id')
res.show()
# process complex object, method 2 using explode and collect_list (SQL)
df.withColumn('id', f.monotonically_increasing_id()).createOrReplaceTempView('tmp_view')
res = spark.sql("""
SELECT first(nam) AS nam, first(q) AS q, collect_list(compl_exploded+1) AS compl FROM (
SELECT *, explode(compl) AS compl_exploded FROM tmp_view
) x
GROUP BY id
""")
res.show()
# process complex object, method 3 using python UDF
from typing import List
def process(x: List[int]) -> List[int]:
return [_+1 for _ in x]
process_udf = f.udf(process, ArrayType(LongType()))
res = df.withColumn('compl', process_udf('compl'))
res.show()
解决方案
对于此类操作,您可以利用构建功能。
例如,在您的用例中,您可以使用如下转换 :
pyspark<=3.0
# Option 1
import pyspark.sql.functions as f
df.withColumn('add_one',f.expr('transform(compl, x -> x+1)')).show()
+---+---+----------------+----------------+
|nam| q| compl| add_one|
+---+---+----------------+----------------+
| a| 1| [1, 3, 5]| [2, 4, 6]|
| b| 2| [4, 6, 9]| [5, 7, 10]|
| c| 3|[50, 60, 70, 80]|[51, 61, 71, 81]|
+---+---+----------------+----------------+
# OR below options , all will give same output
# Option 2
df.select('nam', 'q', 'compl' , f.expr('transform(compl, x -> x+1) as add_one')).show()
# Option 3
df.createOrReplaceTempView('tmp_view')
spark.sql( 'select nam, q, compl , transform(compl, x -> x+1) as add_one from tmp_view').show()
pyspark>=3.1.0
如果您使用的是较新版本的 spark,那么此功能很容易使用,您可以不使用expr
.
推荐阅读
- python - Weibull:R 与 Python - 结果略有不同
- vue.js - NuxtJS - 无法将文件上传按钮添加到元素的每一行
- java - 应用程序在 RoomDb 中插入数据时崩溃
- function - 这些函数中哪一个具有更高的无穷大?
- python - 将所有链接匹配为不正确 - python 验证器
- python - Django:无法访问python脚本中的静态文件
- c++ - 显示 c++ 结果的问题 & 错误的计算结果
- python - 为大型数据集计算类权重的最佳方法
- opencart-3 - 你能告诉我 Opencart Journal 主题执行路线吗?
- python - 如何获取调用者的类定义行号?