首页 > 解决方案 > 如何在特定时间范围内从火花数据帧中选择最大值

问题描述

我有一些货物的记录。container_no具有、origindestination和列的记录shipment_dtvolume

有多个记录相同container_no,并且有可能同一容器已在不同日期发货,但如果shipment_dt落在 10 天的时间跨度内,则检查origincontainer_no如果所有记录具有不同的来源,则删除所有记录具有相同的 container_no 并且在 10 天的时间跨度内,否则选择交易量最高的记录。

请注意:我们将根据container_no.
样本输入:
样本输入

预期输出:
样本输出

选择数据的起始目的地条件:
出发地/目的地条件

我已经写了一个查询来获取十天的时间范围,但是不知道如何比较起点和终点并获取最大量的记录。

用于创建数据框的示例输入查询:

val Input_DF = spark.sql("""
    SELECT '12345' AS container_no , 'India' AS origin ,'China' AS destination , '2020-10-10' AS shipment_dt , 20 as volume UNION
    SELECT '12345' AS container_no , 'India' AS origin ,'China' AS destination , '2020-10-10' AS shipment_dt , 30 as volume UNION
    SELECT '12345' AS container_no , 'India' AS origin ,'China' AS destination , '2020-10-12' AS shipment_dt , 10 as volume UNION
    SELECT '12345' AS container_no , 'India' AS origin ,'China' AS destination , '2020-10-25' AS shipment_dt , 20 as volume UNION
    SELECT '12345' AS container_no , 'India' AS origin ,'China' AS destination , '2020-10-26' AS shipment_dt , 10 as volume UNION
    SELECT '12346' AS container_no , 'India' AS origin ,'China' AS destination , '2020-10-15' AS shipment_dt , 20 as volume UNION
    SELECT '12346' AS container_no , 'India' AS origin ,'China' AS destination , '2020-10-16' AS shipment_dt , 20 as volume UNION
    SELECT '12346' AS container_no , 'India' AS origin ,'China' AS destination , '2020-10-17' AS shipment_dt , 50 as volume UNION
    SELECT '12347' AS container_no , 'India' AS origin ,'China' AS destination , '2020-10-18' AS shipment_dt , 20 as volume UNION
    SELECT '12347' AS container_no , 'Nepal' AS origin ,'China' AS destination , '2020-10-19' AS shipment_dt , 21 as volume""")

Input_DF.createOrReplaceTempView("Input_DF")

查询以创建 10 天的数据时间范围:

val output_df = spark.sql("""
            SELECT
                      B.* ,
                      CASE
                          WHEN from_prev BETWEEN 0 AND 9
                          THEN 1
                          ELSE 0
                      END                     AS recent ,
                      floor(from_first / 10 ) AS recent_group
                  FROM
                      (
                          SELECT
                              A.*,
                              NVL(DATEDIFF(shipment_dt,FIRST(shipment_dt) over(partition BY container_no
                              ORDER BY shipment_dt ASC)) ,0) AS from_first,
                              NVL(DATEDIFF(shipment_dt,lag(shipment_dt,1) over(partition BY container_no
                              ORDER BY shipment_dt ASC)) ,0) from_prev
                          FROM
                              Input_DF A) B 
                  ORDER BY
                    container_no,
                    shipment_dt""")

在示例输入屏幕截图中,我添加了一个额外的列来解释一行与具有相同 container_no 但日期不同的另一记录的不同之处。提前致谢。

标签: sqlscalaapache-sparkapache-spark-sql

解决方案


推荐阅读