pyspark - PySpark如何根据行值创建列
问题描述
我有两个 Spark Dataframes,第一个包含Events
以下信息:
ID | 用户身份 | 日期 |
---|---|---|
1 | 1 | 2020-12-01 |
2 | 2 | 2021-10-10 |
第二个 Dataframe 包含Purchase
以下相关信息:
ID | 用户身份 | 日期 | 价值 |
---|---|---|---|
1 | 1 | 2020-11-10 | 50 |
2 | 1 | 2020-10-10 | 25 |
3 | 2 | 2020-09-15 | 100 |
我想加入两个数据框并创建一个包含最后一个值的列,另一列包含过去 2 个值之间的差异,以及过去 2 次购买之间的日期差异,如下所示:
ID | 用户身份 | 日期 | 最后一个值 | 差异值 | Diff_Date |
---|---|---|---|---|---|
1 | 1 | 2020-12-01 | 50 | 25 | 30 |
2 | 2 | 2021-10-10 | 100 | 无效的 | 无效的 |
要加入数据框,我使用以下代码:
(Events.join(Purchase,
on = [Events.User_id == Purchase.User_id,
Events.Date >= Purchase.Date],
how = "left")
.withColumn('rank_date', F.rank().over(W.partitionBy(Events['Id']).orderBy(Purchase['Data'].desc())))
使用此代码,我可以看到按日期排序的事件之前的购买是什么,但是如何访问行值并根据这些值创建列?
解决方案
我认为这样更容易进行:
- 使用购买数据框
- 加入/过滤事件数据框
然后可以这样做:
window_user_id = Window.partitionBy('user_id')
(
purchase
.withColumn('purchase_rank', F.rank().over(window_user_id.orderBy(F.col('date').asc())))
.withColumn('previous_value', F.lag('value', 1).over(window_user_id.orderBy(F.col('date'))))
.withColumn("diff_value", F.when(F.isnull(F.col("value") - F.col("previous_value")), 0).otherwise(F.col("value") - F.col("previous_value")))
.withColumn('diff_days', F.datediff('date', F.lag('date', 1).over(window_user_id.orderBy(F.col('date')))))
.drop("previous_value")
.show()
)
+---+-------+----------+-----+-------------+----------+---------+
| id|user_id| date|value|purchase_rank|diff_value|diff_days|
+---+-------+----------+-----+-------------+----------+---------+
| 2| 1|2020-10-10| 25| 1| 0| null|
| 1| 1|2020-11-10| 50| 2| 25| 31|
| 3| 2|2020-09-15| 100| 1| 0| null|
+---+-------+----------+-----+-------------+----------+---------+
从这里,更容易加入/过滤任何其他数据。
只保留每个 user_id 的最后一次购买:
(
purchase
.withColumn('purchase_rank', F.rank().over(window_user_id.orderBy(F.col('date').asc())))
.withColumn('previous_value', F.lag('value', 1).over(window_user_id.orderBy(F.col('date'))))
.withColumn("diff_value", F.when(F.isnull(F.col("value") - F.col("previous_value")), 0).otherwise(F.col("value") - F.col("previous_value")))
.withColumn('diff_days', F.datediff('date', F.lag('date', 1).over(window_user_id.orderBy(F.col('date')))))
.withColumn("last_purchase", F.last("purchase_rank").over(window_user_id))
.filter(F.col("purchase_rank") == F.col("last_purchase"))
.drop("previous_value", "purchase_rank", "last_purchase")
.show()
)
+---+-------+----------+-----+----------+---------+
| id|user_id| date|value|diff_value|diff_days|
+---+-------+----------+-----+----------+---------+
| 1| 1|2020-11-10| 50| 25| 31|
| 3| 2|2020-09-15| 100| 0| null|
+---+-------+----------+-----+----------+---------+
推荐阅读
- unit-testing - 如何在单元测试 DataRow 中将 int 转换为 byte?
- python - Pyqt:自定义框架在 mouseMoveEvent 下未检测到鼠标单击
- sql-server - 如何在不使用触发器的情况下在 sql server 中获取更新的列名
- sql - Oracle 中的 Spring Data JPA + EclipseLink
- oracle - 在 oracle 实例 19c 中导入 Oracle DUMP 文件
- c - 无法对 char 数组中的整数执行算术运算
- sql-server - 从 SQL 作业执行 SSIS 包
- c - 同步中的 pthread_mutex_lock 问题
- reactjs - 样式化的组件抛出奇怪的打字稿错误
- javascript - 如何使用 react-spring 制作一个 react 自定义组件动画