首页 > 解决方案 > SELECT 语句中的 Spark IN/EXISTS 谓词

问题描述

我有以下 Spark SQL 测试查询:

Seq("france").toDF.createOrReplaceTempView("countries")
SELECT CASE WHEN country = 'italy' THEN 'Italy' 
    ELSE ( CASE WHEN country IN (FROM countries) THEN upperCase(country) ELSE country END ) 
    END AS country FROM users

引发以下错误:

Exception in thread "main" org.apache.spark.sql.AnalysisException: 
    IN/EXISTS predicate sub-queries can only be used in a Filter

查询的以下部分CASE WHEN country IN (FROM countries)是其原因。

country IN (FROM countries)为了在选择条件中进行模拟,Spark SQL 中是否存在任何解决方法?我对纯 SQL 实现感兴趣,而不是通过 API 实现。

标签: apache-sparkapache-spark-sql

解决方案


这是正确的 SQL 查询:

import sparkSession.implicits._

Seq("france").toDF("country").createOrReplaceTempView("countries")
Seq(("user1", "france"), ("user2", "italy"), ("user2", "usa"))
  .toDF("user", "country").createOrReplaceTempView("users")

val query =
  s"""
     |SELECT
     |  CASE
     |    WHEN u.country = 'italy' THEN 'Italy'
     |    ELSE (
     |      CASE
     |        WHEN u.country = c.country THEN upper(u.country)
     |        ELSE u.country
     |      END
     |    ) END AS country
     |FROM users u
     |LEFT JOIN countries c
     |  ON u.country = c.country
  """.stripMargin
sparkSession.sql(query).show()

结果:

+-------+
|country|
+-------+
| FRANCE|
|  Italy|
|    usa|
+-------+

IN/EXISTS您只能在谓词中使用 sql 运算符的原因是:投影中的逻辑(CASE-WHEN在我们的例子中)为从选择返回的数据集中的每一行评估。CASE WHEN country IN (SELECT * FROM countries)考虑到这一点,对表中的每一行运行等效并不是最好的主意users。因此,SQL 在语言级别(sql 解析器引擎)上防止了这种情况。


推荐阅读