首页 > 解决方案 > 不使用内部联接的 Sql 查询

问题描述

我有一个火花数据集输入DS Dataset<Row>,如下所示

  +---------------+---------------+----------------+-------+--------------+--------+
  |  time         | thingId       |     controller | module| variableName |  value |
  +---------------+---------------+----------------+-------+--------------+--------+
  |1554188276412  |  0002019000000|        0       | 0     |Voltage       |    9   |
  |1554188639406  |  0002019000000|        0       | 0     |SetPoint      |    6   |
  +---------------+---------------+----------------+-------+--------------+--------+

STEP 1生成

Dataset<Row> inputDS = readInput.groupby("thingId","controller","module","variableName").agg(max(struct("time","value")).as("time_value_struct")).select("thingId","controller","module","variableName","time_value_struct.*");

预期产出

 +---------------+---------------+----------------+-------+--------------+--------+
 |  time         | thingId       |     controller | module| variableName |  value |
 +---------------+---------------+----------------+-------+--------------+--------+
 |1554188639406  |  0002019000000|        0       | 0     |Voltage       |    9   |
 |1554188639406  |  0002019000000|        0       | 0     |SetPoint      |    6   |
 +---------------+---------------+----------------+-------+--------------+--------+

Max(time)专栏thingId,controller,module and variableName

time最终目标是根据 MA​​X( ) 列获取每个 thingId、控制器、模块和变量名的最后更新值。

代码

inputDS.createOrReplaceTempView("intermediate");

Dataset<Row> outputDS = spark.sql("select B.time,A.thingId,A.controller,A.module,A.variableName,A.value from intermediate A 
inner join (select thingId,controller,module,MAX(time)time from intermediate group by thingId,controller,module) B 
on A.thingId=B.thingId and A.controller=B.controller and A.module=B.module");

SQL 查询按预期工作,但使用inner join看起来效率不高

1)是否有任何其他有效的方法可以在没有内部连接或等效条件的情况下获得预期输出。

2) 如果我们能够从步骤 1中获得预期的输出,那就太好了

 Dataset<Row> intermediate = inputDS.groupby("thingId","controller","module","variableName").agg(max(struct("time","value")).as("time_value_struct")).select("thingId","controller","module","variableName","time_value_struct.*");

标签: sqlapache-sparkapache-spark-sql

解决方案


这是您当前连接查询的变体,它依赖于ROW_NUMBER

SELECT time, thingId, controller, module, variableName, "value"
FROM
(
    SELECT t.*, ROW_NUMBER() OVER (PARTITION BY thingId, controller, module
                                   ORDER BY time DESC) rn
    FROM intermediate
) t
WHERE rn = 1;

分析函数通常可以击败较老的方法,例如联接。


推荐阅读