scala - 如何更新数据框中的多个列
问题描述
在此处更新解决方案: https ://stackoverflow.com/a/61802973/12322995
在 Spark Scala 中实现以下 sql 查询时需要帮助
Update testScores
SET score1 = a.score1,
source1 = 'FoundOnline',
score2 = a.score2,
source2 = 'FoundOnline',
score3 = a.score3,
source3 = 'FoundOnline'
FROM
(select * from tempScores) a
WHERE testScores.Source1 != 'missing'
and a.score1 > 0
以下是我尝试并坚持的内容,因为对如何在相同条件下更新分数和来源列感到困惑
我有两个数据框反映查询中两个表的数据
testScoresDF
,tempScoresDF
val newResults = testScoresDF.as("test").join(tempScoresDF.as("temp"), Seq("id","products"), "left")
.withColumn("temp_score1",
when($"temp.score1".gt(0) and($"test.Souce1".notEqual("real")), $"temp.Score1")
.otherwise($"test.Score1"))
.withColumn("temp_score2",
when($"temp.score1".gt(0) and($"test.Souce1".notEqual("real")), $"temp.Score2")
.otherwise($"test.Score2"))
.withColumn("temp_score3",
when($"temp.score1".gt(0) and($"test.Souce1".notEqual("real")), $"temp.Score3")
.otherwise($"test.Score3"))
这是两个表的架构。id和products用来加入
testScores
|-- id: integer (nullable = true)
|-- products: string (nullable = true)
|-- score1: float (nullable = true)
|-- score2: float (nullable = true)
|-- score3: float (nullable = true)
|-- source1: string (nullable = true)
|-- source2: string (nullable = true)
|-- source3: string (nullable = true)
tempScores
|-- id: string (nullable = true)
|-- products: string (nullable = true)
|-- score1: float (nullable = true)
|-- score2: float (nullable = true)
|-- score3: float (nullable = true)
OutputSchema 应该与具有更新值的 testScores 模式相同。
Output
|-- id: integer (nullable = true)
|-- products: string (nullable = true)
|-- score1: float (nullable = true)
|-- score2: float (nullable = true)
|-- score3: float (nullable = true)
|-- source1: string (nullable = true)
|-- source2: string (nullable = true)
|-- source3: string (nullable = true)
提前致谢。
解决方案
使用 withColumn 可以工作,但它会有很多重复的代码和条件。
因此,这是在需要时效果更好的解决Update
方案DataFrame
fold or foldLeft or foldRight
在列列表需要一些更新的任何情况下,使用都应该有效。折叠 vs foldLeft vs foldRight
简而言之 - 如果有一个包含列或值的列表,fold - will read one index at a time randomly
, foldLeft - will read the list of values beginning from left to right
,foldRight - is opposite to foldLeft.Reads from Right to Left
解决方案:
1)。首先左连接两个数据框。左连接是因为我们只需要更新满足条件的值,否则保持原样。
2)。cols: List[String]
有一个要更新的列列表。这里List("score1", "score2", "score3","Source1","Source2","Source3")
3)。foldLeft 获取joinedDF 和要更新的列列表,并从列表中一次遍历一个值。
4)。如果列名有source
然后去如果条件和更新source values
,否则更新scores values
5)。最后,您可以重命名temp_score
列名或从正在返回的数据框中选择所需的列
def updateValues(testScoresDF: DataFrame, tempScoresDF: DataFrame, cols: List[String]) = {
val joinDF = testScoresDF.as("test")
.join(tempScoresDF.as("temp"), Seq("id", "products"), "left")
val updateValuesDF = cols.foldLeft(joinDF) {
(df, col) =>
if (col.contains("source")) {
df.withColumn(col,
when($"temp.score1".gt(0) && $"Source1".notEqual("missing"), "FoundOnline")
.otherwise($"$col"))
}
else {
df.withColumn("temp_" + col,
when($"temp.score1".gt(0) && $"Source1".notEqual("missing"), $"temp.$col")
.otherwise($"test.$col"))
}
}
updateValuesDF
}