apache-spark - 如何删除 pyspark 中的模棱两可的列?
问题描述
有许多与此类似的问题正在就避免连接中的重复列提出不同的问题;这不是我在这里要问的。
鉴于我已经有一个包含不明确列的 DataFrame,如何删除特定列?
例如,给定:
df = spark.createDataFrame(
spark.sparkContext.parallelize([
[1, 0.0, "ext-0.0"],
[1, 1.0, "ext-1.0"],
[2, 1.0, "ext-2.0"],
[3, 2.0, "ext-3.0"],
[4, 3.0, "ext-4.0"],
]),
StructType([
StructField("id", IntegerType(), True),
StructField("shared", DoubleType(), True),
StructField("shared", StringType(), True),
])
)
我希望只保留数字列。
但是,尝试执行类似的操作df.select("id", "shared").show()
会导致:
raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: "Reference 'shared' is ambiguous, could be: shared, shared.;"
这个问题的许多相关解决方案只是“避免陷入这种情况”,例如。通过使用['joinkey']
而不是a.joinkey = b.joinkey
加入。我重申,这不是这里的情况。这与已经转换成这种形式的数据框有关。
来自 DF 的元数据消除了这些列的歧义:
$ df.dtypes
[('id', 'int'), ('shared', 'double'), ('shared', 'string')]
$ df.schema
StructType(List(StructField(id,IntegerType,true),StructField(shared,DoubleType,true),StructField(shared,StringType,true)))
所以数据是在内部保留的......我只是看不到如何使用它。
我如何选择一列而不是另一列?
我希望能够使用,例如。col('shared#11')
或类似的......但我看不到这样的东西吗?
这在火花中根本不可能吗?
要回答这个问题,我会问,请发布a)解决上述问题的工作代码片段,或b)链接到spark开发人员的官方内容,这根本不支持?
解决方案
解决此问题的最简单方法是使用 重命名df.toDF(...<new-col-names>...)
,但如果您不想更改列名,则按其类型对重复的列进行分组,struct<type1, type2>
如下所示 -
请注意,以下解决方案是用 scala 编写的,但逻辑上相似的代码可以在 python 中实现。此解决方案也适用于数据框中的所有重复列-
1.加载测试数据
val df = Seq((1, 2.0, "shared")).toDF("id", "shared", "shared")
df.show(false)
df.printSchema()
/**
* +---+------+------+
* |id |shared|shared|
* +---+------+------+
* |1 |2.0 |shared|
* +---+------+------+
*
* root
* |-- id: integer (nullable = false)
* |-- shared: double (nullable = false)
* |-- shared: string (nullable = true)
*/
2.获取所有重复的列名
// 1. get all the duplicated column names
val findDupCols = (cols: Array[String]) => cols.map((_ , 1)).groupBy(_._1).filter(_._2.length > 1).keys.toSeq
val dupCols = findDupCols(df.columns)
println(dupCols.mkString(", "))
// shared
3.重命名重复的列,如shared => shared:string, shared:int
,而不触及其他列名
val renamedDF = df
// 2 rename duplicate cols like shared => shared:string, shared:int
.toDF(df.schema
.map{case StructField(name, dt, _, _) =>
if(dupCols.contains(name)) s"$name:${dt.simpleString}" else name}: _*)
3. 创建所有列的结构
// 3. create struct of all cols
val structCols = df.schema.map(f => f.name -> f ).groupBy(_._1)
.map{case(name, seq) =>
if (seq.length > 1)
struct(
seq.map { case (_, StructField(fName, dt, _, _)) =>
expr(s"`$fName:${dt.simpleString}` as ${dt.simpleString}")
}: _*
).as(name)
else col(name)
}.toSeq
val structDF = renamedDF.select(structCols: _*)
structDF.show(false)
structDF.printSchema()
/**
* +-------------+---+
* |shared |id |
* +-------------+---+
* |[2.0, shared]|1 |
* +-------------+---+
*
* root
* |-- shared: struct (nullable = false)
* | |-- double: double (nullable = false)
* | |-- string: string (nullable = true)
* |-- id: integer (nullable = false)
*/
4. 使用它们的类型获取列<column_name>.<datatype>
// Use the dataframe without losing any columns
structDF.selectExpr("id", "shared.double as shared").show(false)
/**
* +---+------+
* |id |shared|
* +---+------+
* |1 |2.0 |
* +---+------+
*/
希望这对某人有用!