首页 > 解决方案 > UDF 函数如何在 pyspark 中以日期为参数工作?

问题描述

我前段时间开始使用 pyspark 世界,我正在用一种算法绞尽脑汁,最初我想创建一个计算两个日期之间月份差的函数,我知道有一个函数可以解决这个问题(months_between),但是它的工作方式与我想要的有点不同,我想从两个日期中提取月份并减去而不考虑天数,只考虑月份和年份,关键是,我可以通过操作基础来做到这一点,创建新的带有月份和减法的列,但我想将其作为 UDF 函数执行,如下所示:

from datetime import datetime
import pyspark.sql.functions as f

base_study = spark.createDataFrame([("1", "2009-01-31", "2007-01-31"),("2","2009-01-31","2011-01-31")], ['ID', 'A', 'B'])
base_study = base_study.withColumn("A",f.to_date(base_study["A"], 'yyyy-MM-dd'))
base_study = base_study.withColumn("B",f.to_date(base_study["B"], 'yyyy-MM-dd'))


def intckSasFunc(RecentDate, PreviousDate):
    RecentDate = f.month("RecentDate")
    PreviousDate = f.month("PreviousDate")
    months_diff = (RecentDate.year - PreviousDate.year) * 12 + (RecentDate.month - PreviousDate.month)
    return months_diff
  
intckSasFuncUDF = f.udf(intckSasFunc, IntegerType())

base_study.withColumn('Result', intckSasFuncUDF(f.col('B'), f.col('A') ))

我做错了什么?

另一个问题:当我在 UDF 函数中传递参数时,它们是逐个发送还是传递整个列?而这个专栏是一个系列?

谢谢!

标签: datepysparkuser-defined-functions

解决方案


我找到了一个解决方案并将其升级以处理缺失。

from datetime import datetime
import pyspark.sql.functions as f

base_study = spark.createDataFrame([("1", None, "2015-01-01"),("2","2015-01-31","2015-01-31")], ['ID', 'A', 'B'])
base_study = base_study.withColumn("A",f.to_date(base_study["A"], 'yyyy-MM-dd'))
base_study = base_study.withColumn("B",f.to_date(base_study["B"], 'yyyy-MM-dd'))


def intckSasFunc(RecentDate, PreviousDate):

  if (PreviousDate and RecentDate) is not None:
    months_diff = (RecentDate.year - PreviousDate.year) * 12 + (RecentDate.month - PreviousDate.month)
    return months_diff
  else:
    return None 

intckSasFuncUDF = f.udf(lambda x,y:intckSasFunc(x,y) , IntegerType())

display(base_study.withColumn('Result', intckSasFuncUDF(f.col('B'), f.col('A'))))

对于那些有疑问的人,就像我一样,该函数一次处理一条记录,就好像它是一个普通的python函数一样,我不能在这个UDF中使用pyspark.sql函数,它似乎给出了一个错误,这些函数仅在 pypsark 列中使用,并且在 UDF 中,转换是逐行进行的。


推荐阅读