scala - Explode 函数正在增加 Spark DataFrame 中的作业时间
问题描述
我有一个数据框,其中一列 arrs 的数组大小接近 100000。现在我需要分解此列以获得 Array 的所有元素的唯一行。
spark.sql 的 Explode 功能正在完成这项工作,但需要足够的时间我可以尝试优化工作的任何爆炸替代方法。
dfs.printSchema()
println("Orginal DF")
dfs.show()
//Performing Explode operation
import org.apache.spark.sql.functions.{explode,col}
val opdfs=dfs.withColumn("explarrs",explode(col("arrs"))).drop("arrs")
println("Exploded DF")
opdfs.show()
预期结果应如下所示,但此代码的替代方案将更有效地优化工作。
原DF
+----+------+----+--------------------+
|col1| col2|col3| arrs|
+----+------+----+--------------------+
| A|DFtest| K|[1, 2, 3, 4, 5, 6...|
+----+------+----+--------------------+
Exploded DF
+----+------+----+--------+
|col1| col2|col3|explarrs|
+----+------+----+--------+
| A|DFtest| K| 1|
| A|DFtest| K| 2|
| A|DFtest| K| 3|
| A|DFtest| K| 4|
| A|DFtest| K| 5|
| A|DFtest| K| 6|
| A|DFtest| K| 7|
| A|DFtest| K| 8|
| A|DFtest| K| 9|
| A|DFtest| K| 10|
| A|DFtest| K| 11|
| A|DFtest| K| 12|
| A|DFtest| K| 13|
| A|DFtest| K| 14|
| A|DFtest| K| 15|
| A|DFtest| K| 16|
| A|DFtest| K| 17|
| A|DFtest| K| 18|
| A|DFtest| K| 19|
| A|DFtest| K| 20|
+----+------+----+--------+
only showing top 20 rows
解决方案
您可以使用 Dataframe 中的flatMap方法在不爆炸的情况下执行相同的操作。例如,如果您需要分解一个整数数组,您可以继续执行以下操作:
val els = Seq(Row(Array(1, 2, 3)))
val df = spark.createDataFrame(spark.sparkContext.parallelize(els), StructType(Seq(StructField("data", ArrayType(IntegerType), false))))
df.show()
它给:
+---------+
| data|
+---------+
|[1, 2, 3]|
+---------+
使用 Dataframe 的平面图:
df.flatMap(row => row.getAs[mutable.WrappedArray[Int]](0)).show()
+-----+
|value|
+-----+
| 1|
| 2|
| 3|
+-----+
这样做的问题是,除了内存开销之外,您还需要将数组元素的正确类型放入getAs函数中。正如我在评论中所说,修复了一个错误:https ://issues.apache.org/jira/browse/SPARK-21657
但是如果你不能升级你的 Spark 版本,你可以尝试上面的代码并进行比较。
如果要将其他字段添加到结果中,可以执行以下操作:
val els = Seq(Row(Array(1, 2, 3), "data1", "data2"), Row(Array(1, 2, 3, 4, 5, 6), "data10", "data20"))
val df = spark.createDataFrame(spark.sparkContext.parallelize(els),
StructType(Seq(StructField("data", ArrayType(IntegerType), false), StructField("data1", StringType, false), StructField("data2", StringType, false))))
df.show()
df.flatMap{ row =>
val arr = row.getAs[mutable.WrappedArray[Int]](0)
arr.map { el =>
(row.getAs[String](1), row.getAs[String](2), el)
}
}.show()
它给:
+------+------+---+
| _1| _2| _3|
+------+------+---+
| data1| data2| 1|
| data1| data2| 2|
| data1| data2| 3|
|data10|data20| 1|
|data10|data20| 2|
|data10|data20| 3|
|data10|data20| 4|
|data10|data20| 5|
|data10|data20| 6|
+------+------+---+
也许它可以提供帮助。
推荐阅读
- javascript - 使用 React 组件的最佳方式
- android - 使用 SharedPreference 保存后 onResume 上的变量值错误
- asp.net - Kentico 基于声明的身份验证 SAML 2.0
- jenkins - 在哪里可以找到在 Jenkins 中更改工作区路径位置的选项?
- python - 使用 pyInstaller 到闪存驱动器后指定 PyPDF2 的文件路径
- javascript - 将 JavaScript TemplateLiteral 转换为 python 字典
- javascript - HTML5:恢复播放的视频
- c++ - 内联 SSE2 程序集因数据更改而崩溃
- docker - 如何创建正确的 .lando.yml 自定义文件?
- javascript - 在 react-select 输入更改状态后,React 不会重新渲染 ComponentDidUpdate