首页 > 解决方案 > 如何删除 pyspark 中的模棱两可的列?

问题描述

有许多与此类似的问题正在就避免连接中的重复列提出不同的问题;这不是我在这里要问的。

鉴于我已经有一个包含不明确列的 DataFrame,如何删除特定列?

例如,给定:

df = spark.createDataFrame(
    spark.sparkContext.parallelize([
        [1, 0.0, "ext-0.0"],
        [1, 1.0, "ext-1.0"],
        [2, 1.0, "ext-2.0"],
        [3, 2.0, "ext-3.0"],
        [4, 3.0, "ext-4.0"],
    ]),
    StructType([
        StructField("id", IntegerType(), True),
        StructField("shared", DoubleType(), True),
        StructField("shared", StringType(), True),
    ])
)

我希望只保留数字列。

但是,尝试执行类似的操作df.select("id", "shared").show()会导致:

raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: "Reference 'shared' is ambiguous, could be: shared, shared.;"

这个问题的许多相关解决方案只是“避免陷入这种情况”,例如。通过使用['joinkey']而不是a.joinkey = b.joinkey加入。我重申,这不是这里的情况。这与已经转换成这种形式的数据框有关。

来自 DF 的元数据消除了这些列的歧义:

$ df.dtypes
[('id', 'int'), ('shared', 'double'), ('shared', 'string')]

$ df.schema
StructType(List(StructField(id,IntegerType,true),StructField(shared,DoubleType,true),StructField(shared,StringType,true)))

所以数据在内部保留的......我只是看不到如何使用它。

我如何选择一列而不是另一列?

我希望能够使用,例如。col('shared#11')或类似的......但我看不到这样的东西吗?

这在火花中根本不可能吗?

要回答这个问题,我会问,请发布a)解决上述问题的工作代码片段,或b)链接到spark开发人员的官方内容,这根本不支持?

标签: apache-sparkpysparkapache-spark-sql

解决方案


解决此问题的最简单方法是使用 重命名df.toDF(...<new-col-names>...),但如果您不想更改列名,则按其类型对重复的列进行分组,struct<type1, type2>如下所示 -

请注意,以下解决方案是用 scala 编写的,但逻辑上相似的代码可以在 python 中实现。此解决方案也适用于数据框中的所有重复列-

1.加载测试数据

    val df = Seq((1, 2.0, "shared")).toDF("id", "shared", "shared")
    df.show(false)
    df.printSchema()
    /**
      * +---+------+------+
      * |id |shared|shared|
      * +---+------+------+
      * |1  |2.0   |shared|
      * +---+------+------+
      *
      * root
      * |-- id: integer (nullable = false)
      * |-- shared: double (nullable = false)
      * |-- shared: string (nullable = true)
      */

2.获取所有重复的列名

    // 1. get all the duplicated column names
    val findDupCols = (cols: Array[String]) => cols.map((_ , 1)).groupBy(_._1).filter(_._2.length > 1).keys.toSeq
    val dupCols = findDupCols(df.columns)
    println(dupCols.mkString(", "))
    // shared

3.重命名重复的列,如shared => shared:string, shared:int,而不触及其他列名

    val renamedDF = df
      // 2 rename duplicate cols like shared => shared:string, shared:int
      .toDF(df.schema
        .map{case StructField(name, dt, _, _) =>
          if(dupCols.contains(name)) s"$name:${dt.simpleString}" else name}: _*)

3. 创建所有列的结构

    // 3. create struct of all cols
    val structCols = df.schema.map(f => f.name -> f  ).groupBy(_._1)
      .map{case(name, seq) =>
        if (seq.length > 1)
          struct(
            seq.map { case (_, StructField(fName, dt, _, _)) =>
              expr(s"`$fName:${dt.simpleString}` as ${dt.simpleString}")
            }: _*
          ).as(name)
        else col(name)
      }.toSeq
     val structDF = renamedDF.select(structCols: _*)

    structDF.show(false)
    structDF.printSchema()

    /**
      * +-------------+---+
      * |shared       |id |
      * +-------------+---+
      * |[2.0, shared]|1  |
      * +-------------+---+
      *
      * root
      * |-- shared: struct (nullable = false)
      * |    |-- double: double (nullable = false)
      * |    |-- string: string (nullable = true)
      * |-- id: integer (nullable = false)
      */

4. 使用它们的类型获取列<column_name>.<datatype>

    // Use the dataframe without losing any columns
    structDF.selectExpr("id", "shared.double as shared").show(false)
    /**
      * +---+------+
      * |id |shared|
      * +---+------+
      * |1  |2.0   |
      * +---+------+
      */

希望这对某人有用!


推荐阅读