首页 > 解决方案 > 为分组的火花数据框选择最旧的列

问题描述

给定具有以下列的数据框(df):

id,
created_date,
name

我需要确保所有同名的行都具有相同的 id。我可以创建从旧 id 到新 id 的映射(使用 max 在“随机”处选择)。

df.groupBy('name')\
  .agg(
    func.max('id').alias('new_id'),                         
    func.collect_set(id).alias('grouped_ids'))\
  .filter(func.size('grouped_ids') > 1)\                
  .select(func.explode("grouped_ids").alias('old_id'), "new_id")\
  .filter("new_id != old_id")

如果有可用的 new_id,我可以将 leftouter 加入到原始 df(在 id = old_id 上)并交换 id。

但是,我需要确保选择的 new_id 是数据框中最旧的 created_date (而不仅仅是选择最大值)。

如何最好地解决这个问题?

例如给定数据

id, created_date, name
---
17a, 2019-01-05, Jeff
17a, 2019-01-03, Jeremy
d21, 2019-01-04, Jeremy
u45, 2019-01-04, Jeremy
d21, 2019-01-02, Scott
x22, 2019-01-01, Julian

Jeremy 上的第 2、3 和 4 行分组,所以应该有相同的 id。分组 id 的数据框中最旧的 id 是 d21,因为在第 5 行,created_date 是 2019-01-02,因此应该选择它并将其应用于具有其他分组 id 的数据框中的所有行,我们最终得到:

id, created_date, name
---
d21, 2019-01-05, Jeff
d21, 2019-01-03, Jeremy
d21, 2019-01-04, Jeremy
d21, 2019-01-04, Jeremy
d21, 2019-01-02, Scott
x22, 2019-01-01, Julian

更新: @Charles Du - 干杯,我尝试了你的代码,但没有成功,最旧的 id 是从分组名称中选择的,而不是 df 作为一个整体,并且 new_id 没有在整个 df 中应用。

Result:
0 = {Row} Row(name='Scott', created_date='2019-01-02', new_ID='d21', id='d21', created_date='2019-01-02')
1 = {Row} Row(name='Julian', created_date='2019-01-01', new_ID='x22', id='x22', created_date='2019-01-01')
2 = {Row} Row(name='Jeremy', created_date='2019-01-03', new_ID='17a', id='17a', created_date='2019-01-03')
3 = {Row} Row(name='Jeremy', created_date='2019-01-03', new_ID='17a', id='d21', created_date='2019-01-04')
4 = {Row} Row(name='Jeremy', created_date='2019-01-03', new_ID='17a', id='u45', created_date='2019-01-04')
5 = {Row} Row(name='Jeff', created_date='2019-01-05', new_ID='17a', id='17a', created_date='2019-01-05')

标签: dataframepysparkapache-spark-sqlpyspark-sql

解决方案


我的吐痰球在这里

from pyspark.sql import functions as F

new_df = df.groupBy('name').agg(F.min('date'))

new_df = new_df.join(df, on=['name', 'date'], how='inner')

# This should give you a df with a single record for each name with the oldest ID.

new_df = new_df.withColumnRenamed('id', 'new_ID')

#you'll need to decide on a naming convention for your date column since you'll have two if you don't rename

res = new_df.join(df, on='name', how='inner)

这应该将您的身份证与最早的日期相匹配。


推荐阅读