首页 > 解决方案 > 在 Spark 中,如何在不重新分配的情况下重命名数据框的列名?

问题描述

我有一个名为 dataDF 的数据框,我想重命名哪些列。其他数据框 mapDF 具有“original_name”->“code_name”映射。我想根据具有这些值的 mapDF 将 dataDF 的列名称从其“original_name”更改为“code_name”。我试图在循环中重新分配 dataDF,但是当数据量很大并且失去并行性时会产生低性能。这是否可以以更好的方式完成,以使用庞大的 dataDF 数据集实现并行性和良好的性能?

import sparkSession.sqlContext.implicits._
    var dataDF = Seq((10, 20, 30, 40, 50),(100, 200, 300, 400, 500),(10, 222, 333, 444, 555),(1123, 2123, 3123, 4123, 5123),(1321, 2321, 3321, 4321, 5321))
      .toDF("col_1", "col_2", "col_3", "col_4", "col_5")
    dataDF.show(false)

    val mapDF = Seq(("col_1", "code_1", "true"),("col_3", "code_3", "true"),("col_4", "code_4", "true"),("col_5", "code_5", "true"))
      .toDF("original_name", "code_name", "important")
    mapDF.show(false)

    val map_of_codename = mapDF.rdd.map(x => (x.getString(0), x.getString(1))).collectAsMap()

    dataDF.columns.foreach(x => {
      if (map_of_codename.contains(x))
        dataDF = dataDF.withColumnRenamed(x, map_of_codename.get(x).get)
      else
        dataDF = dataDF.withColumnRenamed(x, "None")
    }
    )
    dataDF.show(false)

========================
dataDF
+-----+-----+-----+-----+-----+
|col_1|col_2|col_3|col_4|col_5|
+-----+-----+-----+-----+-----+
|10   |20   |30   |40   |50   |
|100  |200  |300  |400  |500  |
|10   |222  |333  |444  |555  |
|1123 |2123 |3123 |4123 |5123 |
|1321 |2321 |3321 |4321 |5321 |
+-----+-----+-----+-----+-----+

mapDF
+-------------+---------+---------+
|original_name|code_name|important|
+-------------+---------+---------+
|col_1        |code_1   |true     |
|col_3        |code_3   |true     |
|col_4        |code_4   |true     |
|col_5        |code_5   |true     |
+-------------+---------+---------+

expected DF:
+------+----+------+------+------+
|code_1|None|code_3|code_4|code_5|
+------+----+------+------+------+
|10    |20  |30    |40    |50    |
|100   |200 |300   |400   |500   |
|10    |222 |333   |444   |555   |
|1123  |2123|3123  |4123  |5123  |
|1321  |2321|3321  |4321  |5321  |
+------+----+------+------+------+

标签: apache-sparkapache-spark-sql

解决方案


作为替代方案,您可以尝试使用别名,如下所示:

val aliases = dataDF.columns.map(columnName => $"${columnName}".as(map_of_codename.getOrElse(columnName, "None")))
dataDF.select(aliases: _*).show()

dataDF.select(aliases: _*).explain(true)

然后执行计划将由单个投影节点组成,它可能有助于减少优化阶段:

== Analyzed Logical Plan ==
code_1: int, None: int, code_3: int, code_4: int, code_5: int
Project [col_1#16 AS code_1#77, col_2#17 AS None#78, col_3#18 AS code_3#79, col_4#19 AS code_4#80, col_5#20 AS code_5#81]
+- Project [_1#5 AS col_1#16, _2#6 AS col_2#17, _3#7 AS col_3#18, _4#8 AS col_4#19, _5#9 AS col_5#20]
   +- LocalRelation [_1#5, _2#6, _3#7, _4#8, _5#9]

话虽如此,我不确定它是否会解决性能问题,因为在这两种情况下,无论是你foreach的还是上面的提议,由于规则,物理计划都可以优化到单个节点CollapseProject

仅供参考,withColumnRenamed在后台使用类似的方法,除了它分别为每一列执行此操作:

  def withColumnRenamed(existingName: String, newName: String): DataFrame = {
    val resolver = sparkSession.sessionState.analyzer.resolver
    val output = queryExecution.analyzed.output
    val shouldRename = output.exists(f => resolver(f.name, existingName))
    if (shouldRename) {
      val columns = output.map { col =>
        if (resolver(col.name, existingName)) {
          Column(col).as(newName)
        } else {
          Column(col)
        }
      }
      select(columns : _*)
    } else {
      toDF()
    }
  }

您对观察到的性能问题有任何意见吗?一些可以帮助确定需要时间的操作的措施?也许它不一定与列重命名有关?你稍后对这些重命名的列做什么?


推荐阅读