apache-spark - 在 Spark 中,如何在不重新分配的情况下重命名数据框的列名?
问题描述
我有一个名为 dataDF 的数据框,我想重命名哪些列。其他数据框 mapDF 具有“original_name”->“code_name”映射。我想根据具有这些值的 mapDF 将 dataDF 的列名称从其“original_name”更改为“code_name”。我试图在循环中重新分配 dataDF,但是当数据量很大并且失去并行性时会产生低性能。这是否可以以更好的方式完成,以使用庞大的 dataDF 数据集实现并行性和良好的性能?
import sparkSession.sqlContext.implicits._
var dataDF = Seq((10, 20, 30, 40, 50),(100, 200, 300, 400, 500),(10, 222, 333, 444, 555),(1123, 2123, 3123, 4123, 5123),(1321, 2321, 3321, 4321, 5321))
.toDF("col_1", "col_2", "col_3", "col_4", "col_5")
dataDF.show(false)
val mapDF = Seq(("col_1", "code_1", "true"),("col_3", "code_3", "true"),("col_4", "code_4", "true"),("col_5", "code_5", "true"))
.toDF("original_name", "code_name", "important")
mapDF.show(false)
val map_of_codename = mapDF.rdd.map(x => (x.getString(0), x.getString(1))).collectAsMap()
dataDF.columns.foreach(x => {
if (map_of_codename.contains(x))
dataDF = dataDF.withColumnRenamed(x, map_of_codename.get(x).get)
else
dataDF = dataDF.withColumnRenamed(x, "None")
}
)
dataDF.show(false)
========================
dataDF
+-----+-----+-----+-----+-----+
|col_1|col_2|col_3|col_4|col_5|
+-----+-----+-----+-----+-----+
|10 |20 |30 |40 |50 |
|100 |200 |300 |400 |500 |
|10 |222 |333 |444 |555 |
|1123 |2123 |3123 |4123 |5123 |
|1321 |2321 |3321 |4321 |5321 |
+-----+-----+-----+-----+-----+
mapDF
+-------------+---------+---------+
|original_name|code_name|important|
+-------------+---------+---------+
|col_1 |code_1 |true |
|col_3 |code_3 |true |
|col_4 |code_4 |true |
|col_5 |code_5 |true |
+-------------+---------+---------+
expected DF:
+------+----+------+------+------+
|code_1|None|code_3|code_4|code_5|
+------+----+------+------+------+
|10 |20 |30 |40 |50 |
|100 |200 |300 |400 |500 |
|10 |222 |333 |444 |555 |
|1123 |2123|3123 |4123 |5123 |
|1321 |2321|3321 |4321 |5321 |
+------+----+------+------+------+
解决方案
作为替代方案,您可以尝试使用别名,如下所示:
val aliases = dataDF.columns.map(columnName => $"${columnName}".as(map_of_codename.getOrElse(columnName, "None")))
dataDF.select(aliases: _*).show()
dataDF.select(aliases: _*).explain(true)
然后执行计划将由单个投影节点组成,它可能有助于减少优化阶段:
== Analyzed Logical Plan ==
code_1: int, None: int, code_3: int, code_4: int, code_5: int
Project [col_1#16 AS code_1#77, col_2#17 AS None#78, col_3#18 AS code_3#79, col_4#19 AS code_4#80, col_5#20 AS code_5#81]
+- Project [_1#5 AS col_1#16, _2#6 AS col_2#17, _3#7 AS col_3#18, _4#8 AS col_4#19, _5#9 AS col_5#20]
+- LocalRelation [_1#5, _2#6, _3#7, _4#8, _5#9]
话虽如此,我不确定它是否会解决性能问题,因为在这两种情况下,无论是你foreach
的还是上面的提议,由于规则,物理计划都可以优化到单个节点CollapseProject
。
仅供参考,withColumnRenamed
在后台使用类似的方法,除了它分别为每一列执行此操作:
def withColumnRenamed(existingName: String, newName: String): DataFrame = {
val resolver = sparkSession.sessionState.analyzer.resolver
val output = queryExecution.analyzed.output
val shouldRename = output.exists(f => resolver(f.name, existingName))
if (shouldRename) {
val columns = output.map { col =>
if (resolver(col.name, existingName)) {
Column(col).as(newName)
} else {
Column(col)
}
}
select(columns : _*)
} else {
toDF()
}
}
您对观察到的性能问题有任何意见吗?一些可以帮助确定需要时间的操作的措施?也许它不一定与列重命名有关?你稍后对这些重命名的列做什么?
推荐阅读
- c - 将 ASCII 连接到字符串以进行比较
- jmeter - Jmeter - 我的响应有多个相同 id 的值。我想提取这些值并将其增量传递给下一个请求
- javascript - 使用 jQuery $.getScript() 方法以给定的顺序加载多个 JS 文件
- arrays - Delphi ANSI到EBCDIC转换数组问题
- oracle - 顶点 oracle 中的 DBMS_SCHEDULER
- macos - 使安装失败 macOS Catalina
- html - 在 Chrome 中调整 `textarea` 的大小会添加一个内联 CSS 属性 `margin`,但 Firefox 没有添加它...为什么?
- r - 如何一次将R中的日期从多种格式转换?
- c# - .NET Swagger UI 主要 API 规范
- ruby - Ruby:如何在知道其(数组)路径的哈希中检索值?