首页 > 解决方案 > 关于熊猫代码的pyspark meandays计算

问题描述

#熊猫代码

temp = df_merge[['subscription_id', 'cancelleddate', 'subscriptionstartdate', 'termenddate']].drop_duplicates()
df_merge['mean_cancelled_sub_duration'] = (temp['cancelleddate']-temp['subscriptionstartdate']).dt.days.dropna().mean()/ 365
df_merge['mean_sub_duration'] = (temp['termenddate']-temp['subscriptionstartdate']).dt.days.dropna().mean()/365``

如何在 pyspark 中实现与 pandas 代码相同的逻辑,虽然我尝试在 pyspark 中这样做,但它没有帮助我,我们删除了行并且计算错误:

名称中带有日期的列属于日期类型。

#失败的 Pyspark 转换

    temp = df_merge.select('subscription_id', 'cancelleddate', 'subscriptionstartdate', 'termenddate').dropDuplicates()
    temp = temp.withColumn("cancelled_sub_duration", datediff(temp.cancelleddate,temp.subscriptionstartdate)).withColumn("sub_duration", datediff(temp.termenddate,temp.subscriptionstartdate))
    temp = temp.na.drop(subset=['cancelled_sub_duration','sub_duration'])
    spec3 = Window.partitionBy("subscription_id")
    temp = temp.withColumn('mean_cancelled_sub_duration',(mean("cancelled_sub_duration").over(spec3))/365).withColumn('mean_sub_duration',(mean("sub_duration").over(spec3))/365)
    temp = temp.select(col('subscription_id').alias('subsid'), col('mean_cancelled_sub_duration'), col('mean_sub_duration'))
    df_merge = df_merge.join(broadcast(temp), df_merge.subscription_id==temp.subsid,"left").drop(col('subsid'))

标签: pythonpandasapache-sparkpysparkapache-spark-sql

解决方案


您好,请发布 pandas 代码的预期输出以及您从 pyspark 代码中获得的内容,以便我们评估数据集之间的差异。没有它,很难具体看出什么不起作用,什么是。

同时,只是专门查看 pandas 代码并尝试在 pyspark 中点赞,这就是我想出的。

temp = temp \
.withColumn('mean_cancelled_sub_duration' avg(datediff('cancelledate', 'subscriptionstartdate')).over(spec3) / lit(365)) \
.withColumn('mean_sub_duration', avg(datediff('termenddate', 'subscriptionstartdate')).over(spec3) / lit(365))

推荐阅读