scala - 如何使用我在 Spark Join 中创建的列?- 模棱两可的错误
问题描述
我在 scala 中一直在与这个问题作斗争,但我似乎无法找到一个明确的解决方案。
我有2个数据框:
val Companies = Seq(
(8, "Yahoo"),
(-5, "Google"),
(12, "Microsoft"),
(-10, "Uber")
).toDF("movement", "Company")
val LookUpTable = Seq(
("B", "Buy"),
("S", "Sell")
).toDF("Code", "Description")
我需要在Companies中创建一个允许我加入查找表的列。它是一个简单的案例语句,检查运动是否为负,然后卖出,否则买入。然后我需要加入这个新创建的列的查找表。
val joined = Companies.as("Companies")
.withColumn("Code",expr("CASE WHEN movement > 0 THEN 'B' ELSE 'S' END"))
.join(LookUpTable.as("LookUpTable"), $"LookUpTable.Code" === $"Code", "left_outer")
但是,我不断收到以下错误:
org.apache.spark.sql.AnalysisException: Reference 'Code' is ambiguous, could be: Code, LookUpTable.Code.;
at org.apache.spark.sql.catalyst.expressions.package$AttributeSeq.resolve(package.scala:259)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveChildren(LogicalPlan.scala:101)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$40.apply(Analyzer.scala:888)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$40.apply(Analyzer.scala:890)
at org.apache.spark.sql.catalyst.analysis.package$.withPosition(package.scala:53)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveReferences$$resolve(Analyzer.scala:887)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveReferences$$resolve$2.apply(Analyzer.scala:896)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveReferences$$resolve$2.apply(Analyzer.scala:896)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:329)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:327)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveReferences$$resolve(Analyzer.scala:896)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$9$$anonfun$applyOrElse$35.apply(Analyzer.scala:956)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$9$$anonfun$applyOrElse$35.apply(Analyzer.scala:956)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1.apply(QueryPlan.scala:105)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1.apply(QueryPlan.scala:105
我尝试为 Code 添加别名,但这不起作用:
val joined = Companies.as("Companies")
.withColumn("Code",expr("CASE WHEN movement > 0 THEN 'B' ELSE 'S' END"))
.join(LookUpTable.as("LookUpTable"), $"LookUpTable.Code" === $"Companies.Code", "left_outer")
org.apache.spark.sql.AnalysisException: cannot resolve '`Companies.Code`' given input columns: [Code, LookUpTable.Code, LookUpTable.Description, Companies.Company, Companies.movement];;
'Join LeftOuter, (Code#102625 = 'Companies.Code)
:- Project [movement#102616, Company#102617, CASE WHEN (movement#102616 > 0) THEN B ELSE S END AS Code#102629]
: +- SubqueryAlias `Companies`
: +- Project [_1#102613 AS movement#102616, _2#102614 AS Company#102617]
: +- LocalRelation [_1#102613, _2#102614]
+- SubqueryAlias `LookUpTable`
+- Project [_1#102622 AS Code#102625, _2#102623 AS Description#102626]
+- LocalRelation [_1#102622, _2#102623]
我发现的唯一解决方法是为新创建的列设置别名,但是这会创建一个感觉不正确的附加列。
val joined = Companies.as("Companies")
.withColumn("_Code",expr("CASE WHEN movement > 0 THEN 'B' ELSE 'S' END")).as("Code")
.join(LookUpTable.as("LookUpTable"), $"LookUpTable.Code" === $"Code", "left_outer")
joined.show()
+--------+---------+-----+----+-----------+
|movement| Company|_Code|Code|Description|
+--------+---------+-----+----+-----------+
| 8| Yahoo| B| B| Buy|
| 8| Yahoo| B| S| Sell|
| -5| Google| S| B| Buy|
| -5| Google| S| S| Sell|
| 12|Microsoft| B| B| Buy|
| 12|Microsoft| B| S| Sell|
| -10| Uber| S| B| Buy|
| -10| Uber| S| S| Sell|
+--------+---------+-----+----+-----------+
有没有办法加入新创建的列,而不必通过别名创建新的数据框或新列?
解决方案
如果您需要来自two different dataframes having same name
. 这是因为 Spark 数据框 API 为所述数据框创建了一个模式,并且在给定的模式中,您永远不能有两个或更多同名的列。
这也是在 中SQL
,没有别名的SELECT
查询可以工作的原因,但是如果您要执行 a CREATE TABLE AS SELECT
,它会抛出类似 - 的错误duplicate columns
。
推荐阅读
- vba - 如何在一个instr中传递多个值?
- reactjs - 如何传递来自不同组件(关于和联系)的道具?不是父母对孩子。使用反应打字稿
- javascript - 如何在登录后 AirBnb 屏幕可用时单击某个 Airbnb 按钮并以编程方式进行更改?
- java - Blaze Persistence Join 子查询问题
- dapr - Dapr 并使用 sql statestore 代替 redis
- excel - 自动填充找到的列中的值
- tally - 在 Tally Prime 的 tabbing iisue 中添加自定义解决方案然后 F11 配置?
- sql - 使用 RLIKE 排除某些字符串,同时包含其他字符串
- azure-devops - 在 yaml 文件中调用管道变量(Repos)
- python - Python 中具有挑战性的正则表达式子句 - Suricata / fast.log