首页 > 解决方案 > 在 pyspark 中处理复杂对象(数组)

问题描述

我试图找出在 pyspark 中处理复杂对象的可能方法。在下面的示例中,数据框的一列是整数数组。处理只是将每个值加一。这些是可接受的方法还是有更好的做法?

from pyspark.sql import SparkSession
import pyspark.sql.functions as f
spark = SparkSession.builder.enableHiveSupport().appName('learn').getOrCreate()
data = [('a', 1, [1, 3, 5]),
        ('b', 2, [4, 6, 9]),
        ('c', 3, [50, 60, 70, 80])]
df = spark.createDataFrame(data, ['nam', 'q', 'compl'])

# process complex object, method 1 using explode and collect_list (dataframe API)
res = df.withColumn('id', f.monotonically_increasing_id()).withColumn('compl_exploded', f.explode(f.col('compl')))
res = res.withColumn('compl_exploded', f.col('compl_exploded')+1)
res = res.groupby('id').agg(f.first('nam'), f.first('q'), f.collect_list('compl_exploded').alias('compl')).drop('id')
res.show()

# process complex object, method 2 using explode and collect_list (SQL)
df.withColumn('id', f.monotonically_increasing_id()).createOrReplaceTempView('tmp_view')
res = spark.sql("""
SELECT first(nam) AS nam, first(q) AS q, collect_list(compl_exploded+1) AS compl FROM (
    SELECT *, explode(compl) AS compl_exploded FROM tmp_view
    ) x
    GROUP BY id
""")
res.show()

# process complex object, method 3 using python UDF
from typing import List
def process(x: List[int]) -> List[int]:
    return [_+1 for _ in x]
process_udf = f.udf(process, ArrayType(LongType()))
res = df.withColumn('compl', process_udf('compl'))
res.show()

标签: apache-sparkpysparkapache-spark-sql

解决方案


对于此类操作,您可以利用构建功能。

例如,在您的用例中,您可以使用如下转换

pyspark<=3.0

# Option 1 
import pyspark.sql.functions as f
df.withColumn('add_one',f.expr('transform(compl, x -> x+1)')).show()
+---+---+----------------+----------------+
|nam|  q|           compl|         add_one|
+---+---+----------------+----------------+
|  a|  1|       [1, 3, 5]|       [2, 4, 6]|
|  b|  2|       [4, 6, 9]|      [5, 7, 10]|
|  c|  3|[50, 60, 70, 80]|[51, 61, 71, 81]|
+---+---+----------------+----------------+

# OR below options , all will give same output

# Option 2 
df.select('nam', 'q', 'compl' , f.expr('transform(compl, x -> x+1) as add_one')).show()

# Option 3
df.createOrReplaceTempView('tmp_view')
spark.sql( 'select nam, q, compl , transform(compl, x -> x+1) as add_one from tmp_view').show()

pyspark>=3.1.0

如果您使用的是较新版本的 spark,那么此功能很容易使用,您可以不使用expr.


推荐阅读