apache-spark - PySpark DataFrame 根据另一列中时间戳值的最小/最大条件更新列值
问题描述
我有以下数据框:
col1 col2
1 2020-02-27 15:00:00
1 2020-02-27 15:04:00
我需要输出为
col1 col2 col3
1 2020-02-27 15:00
1 2020-02-27 15:04 Y
根据 col2 中存在的最大时间戳值,col3 值必须填充为 Y 或 null。
我尝试了以下方法:
df = spark.sql("select col1,col2 from table")
max_ts = df.select(max("col2")).show()
y=(f.when(f.col('col2') == max_ts, "Y"))
df1 = df.withColumn('col3',y)
上述方法仅产生空输出。
请提出可能的解决方案或错误?
TIA。
编辑:我需要在 col1 上执行 groupBy 并在 col2 中获取最大值
解决方案
也许这有帮助-
DSL API
max(..).over(window)
df2.show(false)
df2.printSchema()
/**
* +----+-------------------+
* |col1|col2 |
* +----+-------------------+
* |1 |2020-02-27 15:00:00|
* |1 |2020-02-27 15:04:00|
* +----+-------------------+
*
* root
* |-- col1: integer (nullable = true)
* |-- col2: timestamp (nullable = true)
*/
val w = Window.partitionBy("col1")
df2.withColumn("col3",
when(max("col2").over(w).cast("long") - col("col2").cast("long")=== 0, "Y")
)
.show(false)
/**
* +----+-------------------+----+
* |col1|col2 |col3|
* +----+-------------------+----+
* |1 |2020-02-27 15:00:00|null|
* |1 |2020-02-27 15:04:00|Y |
* +----+-------------------+----+
*/
火花 SQL
df2.createOrReplaceTempView("table")
spark.sql(
"""
| select col1, col2,
| case when (cast(max(col2) over (partition by col1) as long) - cast(col2 as long) = 0) then 'Y' end as col3
| from table
""".stripMargin)
.show(false)
/**
* +----+-------------------+----+
* |col1|col2 |col3|
* +----+-------------------+----+
* |1 |2020-02-27 15:00:00|null|
* |1 |2020-02-27 15:04:00|Y |
* +----+-------------------+----+
*/
推荐阅读
- tensorflow-federated - 尝试“tff.learning.assign_weights_to_keras_model”方法时出错
- c++ - 在 C++ 中,给定字符串向量,如何在编译时填充映射?
- javascript - 如何将数据从 javascript 函数传递到重定向页面?
- javascript - 为什么leaflet.js 在map.addlayer(layer) 上添加了不止一层?
- javascript - 使用 CefSharp 获取 xhtml 后停止内容加载
- java - 如何避免这种情况下的instanceof?
- visual-studio-code - VSCode 冻结并停止响应
- java - Log4j2:如何说服它 log4j-core.jar 的类确实存在?
- azure-devops - 如何允许运行时参数使用空字符串?
- javascript - 为什么使用 mywindow.print() 从 pdf 文件中排除图像?