首页 > 解决方案 > PySpark如何根据行值创建列

问题描述

我有两个 Spark Dataframes,第一个包含Events以下信息:

ID 用户身份 日期
1 1 2020-12-01
2 2 2021-10-10

第二个 Dataframe 包含Purchase以下相关信息:

ID 用户身份 日期 价值
1 1 2020-11-10 50
2 1 2020-10-10 25
3 2 2020-09-15 100

我想加入两个数据框并创建一个包含最后一个值的列,另一列包含过去 2 个值之间的差异,以及过去 2 次购买之间的日期差异,如下所示:

ID 用户身份 日期 最后一个值 差异值 Diff_Date
1 1 2020-12-01 50 25 30
2 2 2021-10-10 100 无效的 无效的

要加入数据框,我使用以下代码:

(Events.join(Purchase,
         on = [Events.User_id == Purchase.User_id,
               Events.Date >= Purchase.Date],
         how = "left")
   .withColumn('rank_date', F.rank().over(W.partitionBy(Events['Id']).orderBy(Purchase['Data'].desc())))

使用此代码,我可以看到按日期排序的事件之前的购买是什么,但是如何访问行值并根据这些值创建列?

标签: pysparkapache-spark-sql

解决方案


我认为这样更容易进行:

  • 使用购买数据框
  • 加入/过滤事件数据框

然后可以这样做:

window_user_id = Window.partitionBy('user_id')

(
    purchase
    .withColumn('purchase_rank', F.rank().over(window_user_id.orderBy(F.col('date').asc())))
    .withColumn('previous_value',   F.lag('value', 1).over(window_user_id.orderBy(F.col('date'))))
    .withColumn("diff_value", F.when(F.isnull(F.col("value") - F.col("previous_value")), 0).otherwise(F.col("value") - F.col("previous_value")))
    .withColumn('diff_days', F.datediff('date', F.lag('date', 1).over(window_user_id.orderBy(F.col('date')))))
    .drop("previous_value")
    .show()
)

+---+-------+----------+-----+-------------+----------+---------+
| id|user_id|      date|value|purchase_rank|diff_value|diff_days|
+---+-------+----------+-----+-------------+----------+---------+
|  2|      1|2020-10-10|   25|            1|         0|     null|
|  1|      1|2020-11-10|   50|            2|        25|       31|
|  3|      2|2020-09-15|  100|            1|         0|     null|
+---+-------+----------+-----+-------------+----------+---------+

从这里,更容易加入/过滤任何其他数据。

只保留每个 user_id 的最后一次购买:

(
    purchase
    .withColumn('purchase_rank', F.rank().over(window_user_id.orderBy(F.col('date').asc())))
    .withColumn('previous_value',   F.lag('value', 1).over(window_user_id.orderBy(F.col('date'))))
    .withColumn("diff_value", F.when(F.isnull(F.col("value") - F.col("previous_value")), 0).otherwise(F.col("value") - F.col("previous_value")))
    .withColumn('diff_days', F.datediff('date', F.lag('date', 1).over(window_user_id.orderBy(F.col('date')))))
    .withColumn("last_purchase", F.last("purchase_rank").over(window_user_id))
    .filter(F.col("purchase_rank") == F.col("last_purchase"))
    .drop("previous_value", "purchase_rank", "last_purchase")
    .show()
)

+---+-------+----------+-----+----------+---------+
| id|user_id|      date|value|diff_value|diff_days|
+---+-------+----------+-----+----------+---------+
|  1|      1|2020-11-10|   50|        25|       31|
|  3|      2|2020-09-15|  100|         0|     null|
+---+-------+----------+-----+----------+---------+

推荐阅读