首页 > 解决方案 > 如何使用 Apache Spark Dataframes 写入 IN 和 NOT IN

问题描述

我在 SQL 中有以下 2 个 SQL 查询示例:

a) update DBTABLE1
    set col1 = 'Yes'
where ID IN ( '100' ) and City = any(select City from DBTable2 where Country = 'USA');

b) update DBTABLE1
    set col2 = 'No'
where ID NOT IN ( '100' ) and City = any(select City from DBTable2 where Country = 'USA');


How to write above 2 SQLs using Apache Spark Dataframes (Not Select subquery etc). A dataframe is already having these 2 columns - col1 and col2, I am changing their values using WITHCOLUMN and WHEN clause. 

CitiDF 包含城市数量的数据集。

I tried below but giving compile errors:

c) This is for (a) above:

withcolumn(col("col1"),when(col("id") === lit("100") 
and col("city").isin(CitiDF("city")), lit("yes")))

d) This is for (b) above:

withcolumn(col("col2"),when(col("id") === lit("100") 
and ! (col("city").isin(CitiDF("city"))), lit("yes")))

标签: sqlscalaapache-sparkdataframeapache-spark-sql

解决方案


为了使事情更具体,让我们考虑一些玩具数据。我们有一个名为 的 DataFrame df,如下所示:

+---+---------+------+------+
| id|     city|  col1|  col2|
+---+---------+------+------+
|100|Frankfurt|filler|filler|
|200|   Berlin|filler|filler|
|100|   Vienna|filler|filler|
|500| Victoria|filler|filler|
|600| Shanghai|filler|filler|
|100|  Cologne|filler|filler| 
+---+---------+------+------+

另一个名为cities,看起来像这样:

+---------+
| cityName|
+---------+
|Frankfurt|
|   Vienna|
+---------+

我们可以像这样进行您的查询:

val cityList = cities.collect.map(x => x(0))
val df1 = df.withColumn("col1", when($"id" === "100" and $"city".isin(cityList: _*), "yes"))

我们得到的结果是:

+---+---------+----+------+
| id|     city|col1|  col2|
+---+---------+----+------+
|100|Frankfurt| yes|filler|
|200|   Berlin|null|filler|
|100|   Vienna| yes|filler|
|500| Victoria|null|filler|
|600| Shanghai|null|filler|
|100|  Cologne|null|filler|
+---+---------+----+------+

对于您的第二个查询,我们使用相同的cityList

val df2 = df.withColumn("col2", when($"id" === "100" and !$"city".isin(cityList: _*), "yes"))

给我们

+---+---------+------+----+
| id|     city|  col1|col2|
+---+---------+------+----+
|100|Frankfurt|filler|null|
|200|   Berlin|filler|null|
|100|   Vienna|filler|null|
|500| Victoria|filler|null|
|600| Shanghai|filler|null|
|100|  Cologne|filler| yes|
+---+---------+------+----+

但是,这种方法有一个很大的警告。如果城市数量很大,您可能会通过收集所有名称来耗尽内存。相反,我会考虑使用另一种方法,例如外连接:

df.join(cities, df("city") === cities("cityName"), "outer").
  withColumn("col1", when($"cityName".isNotNull and $"id" === "100", "yes")).
  withColumn("col2", when($"cityName".isNull and $"id" === "100", "yes")).
  drop("cityName")

给我们

+---+---------+----+----+
| id|     city|col1|col2|
+---+---------+----+----+
|100|Frankfurt| yes|null|
|500| Victoria|null|null|
|200|   Berlin|null|null|
|100|   Vienna| yes|null|
|100|  Cologne|null| yes|
|600| Shanghai|null|null|
+---+---------+----+----+

是的,它引入了一个额外的列,但只是暂时的,并且避免了将潜在的大量城市列表拉入驾驶员的记忆中。


推荐阅读