python - 在数据框中查找以前的唯一值 - Pyspark
问题描述
我有一个数据框,其中包含客户每个日期的多个产品。在一个新专栏中,我试图按客户获取以前的唯一日期。
Cst Prod Dt Desired Output
C1 P1 1-Jan-16 0
C1 P2 1-Jan-16 0
C1 P3 1-Jan-16 0
C1 P4 1-Jan-16 0
C1 P1 20-Jan-16 1-Jan-16
C1 P2 20-Jan-16 1-Jan-16
C2 P2 5-Feb-17 0
C2 P3 5-Feb-17 0
C2 P4 5-Feb-17 0
C2 P1 30-Mar-17 5-Feb-17
我刚从 PySpark 开始。到目前为止,我尝试为每个客户创建一个日期数组列 (CUM_DATE),然后应用 UDF 来获取除行中一个之外的所有日期,然后获取数组列的最大值。
有点像——
def filter_currdate(arr, dt):
return [x for x in arr if x not in dt]
filter_currdate_udf = F.udf(lambda x: filter_code(x), ArrayType(DateType()))
df = df.withColumn('except_date', filter_currdate_udf(df['CUM_DATE'], df['Dt']))
df = df.withColumn('max_prev_date',F.max(df['except_date']))
但它遇到了错误,我无法找到更好的方法来获得这个输出。
解决方案
还有其他没有自定义 UDF 函数的方法。假设df
有列cst
, prod
, dt
:
from pyspark.sql.functions import max
df.alias('df1').join(df.alias('df2'),
(
col('df1.cst')==col('df2.cst')
& col('df1.prod') == col('df2.prod')
& col('df1.dt') > col('df2.dt'),
how='left_outer'
).select('df1.*', 'df2.dt')
.groupBy('df1.cst', 'df1.prod', 'df1.dt')
.agg(max('df2.dt'))
推荐阅读
- php - 传递多维 JSON PHP 结果错误
- node.js - 如何从控制器 JSON 返回的实体字段中排除。NestJS + Typeorm
- algorithm - 应该使用哪些符号来指定乳胶中变量的版本?
- python - 将列添加到数据框 K-Mean - Python
- python - 如何使用 python 和 webdriver 在标签之间获取文本?
- apache-spark - Spark:YARN 在 NettyMemoryMetrics 上抛出 NoSuchMethodError
- python - 登录后有条件地重定向用户
- javascript - setState 不呈现(在 ftech 中获取)
- python - C++纯虚函数和python中的this指针
- machine-learning - 随机梯度下降增加成本函数