首页 > 解决方案 > PySpark DataFrame 根据另一列中时间戳值的最小/最大条件更新列值

问题描述

我有以下数据框:

col1        col2 
            
    1   2020-02-27 15:00:00 

    1   2020-02-27 15:04:00 

我需要输出为

col1        col2            col3

    1   2020-02-27 15:00    
    1   2020-02-27 15:04    Y

根据 col2 中存在的最大时间戳值,col3 值必须填充为 Y 或 null。

我尝试了以下方法:

df = spark.sql("select col1,col2 from table")

max_ts = df.select(max("col2")).show() 

y=(f.when(f.col('col2') == max_ts, "Y")) 

df1 = df.withColumn('col3',y) 

上述方法仅产生空输出。

请提出可能的解决方案或错误?

TIA。

编辑:我需要在 col1 上执行 groupBy 并在 col2 中获取最大值

标签: apache-sparkpysparkapache-spark-sql

解决方案


也许这有帮助-

DSL API

max(..).over(window)

df2.show(false)
    df2.printSchema()
    /**
      * +----+-------------------+
      * |col1|col2               |
      * +----+-------------------+
      * |1   |2020-02-27 15:00:00|
      * |1   |2020-02-27 15:04:00|
      * +----+-------------------+
      *
      * root
      * |-- col1: integer (nullable = true)
      * |-- col2: timestamp (nullable = true)
      */

    val w = Window.partitionBy("col1")
    df2.withColumn("col3",
      when(max("col2").over(w).cast("long") - col("col2").cast("long")=== 0, "Y")
    )
      .show(false)

    /**
      * +----+-------------------+----+
      * |col1|col2               |col3|
      * +----+-------------------+----+
      * |1   |2020-02-27 15:00:00|null|
      * |1   |2020-02-27 15:04:00|Y   |
      * +----+-------------------+----+
      */

火花 SQL

 df2.createOrReplaceTempView("table")
    spark.sql(
      """
        | select col1, col2,
        |   case when (cast(max(col2) over (partition by col1) as long) - cast(col2 as long) = 0) then 'Y' end as col3
        | from table
      """.stripMargin)
      .show(false)

    /**
      * +----+-------------------+----+
      * |col1|col2               |col3|
      * +----+-------------------+----+
      * |1   |2020-02-27 15:00:00|null|
      * |1   |2020-02-27 15:04:00|Y   |
      * +----+-------------------+----+
      */

推荐阅读