首页 > 解决方案 > 如何使用我在 Spark Join 中创建的列?- 模棱两可的错误

问题描述

我在 scala 中一直在与这个问题作斗争,但我似乎无法找到一个明确的解决方案。

我有2个数据框:

val Companies = Seq(
  (8, "Yahoo"),
  (-5, "Google"),
  (12, "Microsoft"),
  (-10, "Uber")
).toDF("movement", "Company")
val LookUpTable = Seq(
  ("B", "Buy"),
  ("S", "Sell")
).toDF("Code", "Description")

我需要在Companies中创建一个允许我加入查找表的列。它是一个简单的案例语句,检查运动是否为负,然后卖出,否则买入。然后我需要加入这个新创建的列的查找表。

val joined = Companies.as("Companies")
    .withColumn("Code",expr("CASE WHEN movement > 0 THEN 'B' ELSE 'S' END"))
    .join(LookUpTable.as("LookUpTable"), $"LookUpTable.Code" === $"Code", "left_outer")

但是,我不断收到以下错误:

org.apache.spark.sql.AnalysisException: Reference 'Code' is ambiguous, could be: Code, LookUpTable.Code.;
  at org.apache.spark.sql.catalyst.expressions.package$AttributeSeq.resolve(package.scala:259)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveChildren(LogicalPlan.scala:101)
  at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$40.apply(Analyzer.scala:888)
  at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$40.apply(Analyzer.scala:890)
  at org.apache.spark.sql.catalyst.analysis.package$.withPosition(package.scala:53)
  at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveReferences$$resolve(Analyzer.scala:887)
  at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveReferences$$resolve$2.apply(Analyzer.scala:896)
  at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveReferences$$resolve$2.apply(Analyzer.scala:896)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:329)
  at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
  at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:327)
  at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveReferences$$resolve(Analyzer.scala:896)
  at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$9$$anonfun$applyOrElse$35.apply(Analyzer.scala:956)
  at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$9$$anonfun$applyOrElse$35.apply(Analyzer.scala:956)
  at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1.apply(QueryPlan.scala:105)
  at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1.apply(QueryPlan.scala:105

我尝试为 Code 添加别名,但这不起作用:

val joined = Companies.as("Companies")
    .withColumn("Code",expr("CASE WHEN movement > 0 THEN 'B' ELSE 'S' END"))
    .join(LookUpTable.as("LookUpTable"), $"LookUpTable.Code" === $"Companies.Code", "left_outer")

org.apache.spark.sql.AnalysisException: cannot resolve '`Companies.Code`' given input columns: [Code, LookUpTable.Code, LookUpTable.Description, Companies.Company, Companies.movement];;
'Join LeftOuter, (Code#102625 = 'Companies.Code)
:- Project [movement#102616, Company#102617, CASE WHEN (movement#102616 > 0) THEN B ELSE S END AS Code#102629]
:  +- SubqueryAlias `Companies`
:     +- Project [_1#102613 AS movement#102616, _2#102614 AS Company#102617]
:        +- LocalRelation [_1#102613, _2#102614]
+- SubqueryAlias `LookUpTable`
   +- Project [_1#102622 AS Code#102625, _2#102623 AS Description#102626]
      +- LocalRelation [_1#102622, _2#102623]

我发现的唯一解决方法是为新创建的列设置别名,但是这会创建一个感觉不正确的附加列。


val joined = Companies.as("Companies")
    .withColumn("_Code",expr("CASE WHEN movement > 0 THEN 'B' ELSE 'S' END")).as("Code")
    .join(LookUpTable.as("LookUpTable"), $"LookUpTable.Code" === $"Code", "left_outer")


joined.show()

+--------+---------+-----+----+-----------+
|movement|  Company|_Code|Code|Description|
+--------+---------+-----+----+-----------+
|       8|    Yahoo|    B|   B|        Buy|
|       8|    Yahoo|    B|   S|       Sell|
|      -5|   Google|    S|   B|        Buy|
|      -5|   Google|    S|   S|       Sell|
|      12|Microsoft|    B|   B|        Buy|
|      12|Microsoft|    B|   S|       Sell|
|     -10|     Uber|    S|   B|        Buy|
|     -10|     Uber|    S|   S|       Sell|
+--------+---------+-----+----+-----------+

有没有办法加入新创建的列,而不必通过别名创建新的数据框或新列?

标签: scalaapache-spark

解决方案


如果您需要来自two different dataframes having same name. 这是因为 Spark 数据框 API 为所述数据框创建了一个模式,并且在给定的模式中,您永远不能有两个或更多同名的列。

这也是在 中SQL,没有别名的SELECT查询可以工作的原因,但是如果您要执行 a CREATE TABLE AS SELECT,它会抛出类似 - 的错误duplicate columns


推荐阅读