首页 > 解决方案 > Explode 函数正在增加 Spark DataFrame 中的作业时间

问题描述

我有一个数据框,其中一列 arrs 的数组大小接近 100000。现在我需要分解此列以获得 Array 的所有元素的唯一行。

spark.sql 的 Explode 功能正在完成这项工作,但需要足够的时间我可以尝试优化工作的任何爆炸替代方法。

 dfs.printSchema()
 println("Orginal DF")
 dfs.show()

 //Performing Explode operation
 import org.apache.spark.sql.functions.{explode,col}
 val opdfs=dfs.withColumn("explarrs",explode(col("arrs"))).drop("arrs")
 println("Exploded DF")
 opdfs.show()

预期结果应如下所示,但此代码的替代方案将更有效地优化工作。

原DF

+----+------+----+--------------------+
|col1|  col2|col3|                arrs|
+----+------+----+--------------------+
|   A|DFtest|   K|[1, 2, 3, 4, 5, 6...|
+----+------+----+--------------------+

Exploded DF
+----+------+----+--------+
|col1|  col2|col3|explarrs|
+----+------+----+--------+
|   A|DFtest|   K|       1|
|   A|DFtest|   K|       2|
|   A|DFtest|   K|       3|
|   A|DFtest|   K|       4|
|   A|DFtest|   K|       5|
|   A|DFtest|   K|       6|
|   A|DFtest|   K|       7|
|   A|DFtest|   K|       8|
|   A|DFtest|   K|       9|
|   A|DFtest|   K|      10|
|   A|DFtest|   K|      11|
|   A|DFtest|   K|      12|
|   A|DFtest|   K|      13|
|   A|DFtest|   K|      14|
|   A|DFtest|   K|      15|
|   A|DFtest|   K|      16|
|   A|DFtest|   K|      17|
|   A|DFtest|   K|      18|
|   A|DFtest|   K|      19|
|   A|DFtest|   K|      20|
+----+------+----+--------+
only showing top 20 rows

标签: scaladataframeapache-sparkdataset

解决方案


您可以使用 Dataframe 中的flatMap方法在不爆炸的情况下执行相同的操作。例如,如果您需要分解一个整数数组,您可以继续执行以下操作:

val els = Seq(Row(Array(1, 2, 3)))
val df = spark.createDataFrame(spark.sparkContext.parallelize(els), StructType(Seq(StructField("data", ArrayType(IntegerType), false))))
df.show()

它给:

+---------+
|     data|
+---------+
|[1, 2, 3]|
+---------+

使用 Dataframe 的平面图:

df.flatMap(row => row.getAs[mutable.WrappedArray[Int]](0)).show()

+-----+
|value|
+-----+
|    1|
|    2|
|    3|
+-----+

这样做的问题是,除了内存开销之外,您还需要将数组元素的正确类型放入getAs函数中。正如我在评论中所说,修复了一个错误:https ://issues.apache.org/jira/browse/SPARK-21657

但是如果你不能升级你的 Spark 版本,你可以尝试上面的代码并进行比较。

如果要将其他字段添加到结果中,可以执行以下操作:

val els = Seq(Row(Array(1, 2, 3), "data1", "data2"), Row(Array(1, 2, 3, 4, 5, 6), "data10", "data20"))

val df = spark.createDataFrame(spark.sparkContext.parallelize(els),
  StructType(Seq(StructField("data", ArrayType(IntegerType), false), StructField("data1", StringType, false), StructField("data2", StringType, false))))

df.show()

df.flatMap{ row =>
  val arr = row.getAs[mutable.WrappedArray[Int]](0)
  arr.map { el =>
    (row.getAs[String](1), row.getAs[String](2), el)
  }
}.show()

它给:

+------+------+---+
|    _1|    _2| _3|
+------+------+---+
| data1| data2|  1|
| data1| data2|  2|
| data1| data2|  3|
|data10|data20|  1|
|data10|data20|  2|
|data10|data20|  3|
|data10|data20|  4|
|data10|data20|  5|
|data10|data20|  6|
+------+------+---+

也许它可以提供帮助。


推荐阅读