首页 > 解决方案 > SPARK SQL 中的相关子查询列不允许作为不等式谓词的一部分

问题描述

我正在尝试在 where 子句中编写一个子查询,如下所示。但我得到 “不等式谓词中不允许相关列:”

SELECT *, 
   holidays 
      FROM   ( 
          SELECT *, 
                 s.holidays, 
                 s.entity 
          FROM   transit_t tt 
          WHERE  ( 
                        SELECT Count(thedate) AS holidays 
                        FROM   fact_ent_rt 
                        WHERE  entity=tt.awborigin 
                        AND    ( 
                                      Substring(thedate,1,10)) BETWEEN (Substring(awbpickupdate,1,10)) AND    (
                                      Substring(deliverydate,1,10)) 
                        AND    ( 
                                      nholidayflag = true 
                               OR     weekendflag = true))) s

此查询的任何问题。因为我认为 spark > 2.0 在 where 子句中支持子查询。任何建议,将不胜感激。谢谢

输入将是运输表中的取货日期和交货日期。我们需要找出这些日期之间是否有一个周末(该数据在 fact_ent_rt 中可用)并计算假期的数量。

我得到的输出是 pyspark.sql.utils.AnalysisException: u"Correlated column is not allowed in a non-equality predicate:\nAggregate

样本输入:

输入1:+---------++-------+---------------- ----+

|AWBOrigin||      AWBPickupDate|       DeliveryDate|

+---------++-------------------+-------------------+

|      LON||2018-09-01 08:52:00|2018-09-12 13:57:00|
|      DHA||2018-09-04 11:47:00|2018-09-08 07:30:00|
|      NIC||2009-01-01 01:47:00|2009-01-09 11:37:00
+---------+-----------+-----------------------------

输入 2 (fact_ent)

------+-------------------+-----------+------------

 Entity|            TheDate|WeekendFlag|NHolidayFlag

 ------+-------------------+-----------+------------

NIC|2009-01-01 00:00:00|      False|       False
NIC|2009-01-02 00:00:00|      False|       False
NIC|2009-01-03 00:00:00|       True|       False
NIC|2009-01-04 00:00:00|       True|       False
NIC|2009-01-05 00:00:00|      False|       False
NIC|2009-01-06 00:00:00|      False|       False
NIC|2009-01-07 00:00:00|      False|       False
NIC|2009-01-08 00:00:00|      False|       False
NIC|2009-01-09 00:00:00|      False|       False
NIC|2009-01-10 00:00:00|       True|       False
NIC|2009-01-11 00:00:00|       True|       False
NIC|2009-01-12 00:00:00|      False|       False
NIC|2009-01-13 00:00:00|      False|       False
NIC|2009-01-14 00:00:00|      False|       False
NIC|2009-01-15 00:00:00|      False|       False
NIC|2009-01-16 00:00:00|      False|       False
NIC|2009-01-17 00:00:00|       True|       False
NIC|2009-01-18 00:00:00|       True|       False
NIC|2009-01-19 00:00:00|      False|       False
NIC|2009-01-20 00:00:00|      False|       False
------+-------------------+-----------+------------

预期输出

 +---------++-------------------+-------------------+

|AWBOrigin||      AWBPickupDate|       DeliveryDate| Holidays

+---------++-------------------+-------------------+

|      LON||2018-09-01 08:52:00|2018-09-12 13:57:00|  NA
|      DHA||2018-09-04 11:47:00|2018-09-08 07:30:00|  NA
|      NIC||2009-01-01 01:47:00|2009-01-09 11:37:00|  2
+---------+-----------+-----------------------------

标签: apache-sparkapache-spark-sqlpyspark-sql

解决方案


我用 SCALA 做到了这一点,所以你需要转换,但我认为以一种更容易的方式。我添加了一个密钥并在密钥级别进行了操作,您可以对其进行调整和聚合。但原理要简单得多。不需要相关的子查询。只是关系演算。用于日期等的数字。

// SCALA 
// Slightly ambiguous on hols vs. weekend, as you stated treated as 1

import spark.implicits._ 
import org.apache.spark.sql.functions._

val dfE = Seq( 
              ("NIC", 1, false, false),
              ("NIC", 2, false, false),
              ("NIC", 3, true, false),
              ("NIC", 4, true, true),
              ("NIC", 5, false, false),
              ("NIC", 6, false, false),
              ("XYZ", 1, false, true)
              ).toDF("e","d","w", "h")
 //dfE.show(false)

 val dfE2 = dfE.withColumn("wh", when ($"w" or $"h", 1) otherwise (0)).drop("w").drop("h")
 //dfE2.show()

//Assuming more dfD's can exist
val dfD = Seq( 
              ("NIC", 1, 4, "k1"),
              ("NIC", 2, 3, "k2"),
              ("NIC", 1, 1, "k3"),
              ("NIC", 7, 10, "k4")
              ).toDF("e","pd","dd", "k")
//dfD.show(false)

dfE2.createOrReplaceTempView("E2")
dfD.createOrReplaceTempView("D1")

// This done per record, if over identical keys, then strip k and aggr otherwise, I added k for checking each entry
// Point is it is far easier. Key means synthetic grouping by.

val q=sqlContext.sql(""" SELECT d1.k, d1.e, d1.pd, d1.dd, sum(e2.wh) 
                       FROM D1, E2
                      WHERE D1.e = E2.e 
                        AND E2.d >= D1.pd
                        AND E2.d <= D1.dd
                    GROUP BY d1.k, d1.e, d1.pd, d1.dd   
                    ORDER BY d1.k, d1.e, d1.pd, d1.dd
                     """)
q.show

返回:

 +---+---+---+---+-------+
 |  k|  e| pd| dd|sum(wh)|
 +---+---+---+---+-------+
 | k1|NIC|  1|  4|      2|
 | k2|NIC|  2|  3|      1|
 | k3|NIC|  1|  1|      0|
 +---+---+---+---+-------+

我认为可以进行简单的性能改进。事实上没有相关的东西req'd。

如果需要,可以使用 AND E2.d BETWEEN D1.pd AND D1.dd。


推荐阅读