首页 > 解决方案 > Spark(pySpark)组通过错误排序collect_list上的第一个元素

问题描述

我有以下数据框(df_parquet):

DataFrame[id: bigint, date: timestamp, consumption: decimal(38,18)]

我打算使用collect_list获取日期和消费的排序列表,正如这篇文章中所述:collect_list by preserve order based on another variable

我正在遵循最后一种方法(https://stackoverflow.com/a/49246162/11841618),这是我认为更有效的方法。

因此,我不只是使用默认分区数(200)调用 repartition,而是使用 500 调用它,并且我按 id 和日期在分区内排序,而不仅仅是按日期(为了使 groupBy 更有效率,或者我希望如此) . 问题是每个分区一次(每个分区只有一个 id,而且它似乎是一个随机 id)我在最后一个位置获得列表的第一项。

关于发生了什么的任何线索?其余的 id 在其数组中排序良好,所以我认为 groupBy 或 collect_list 在每个分区内的行为方式存在一些问题。

我通过获取分区 id 并检查相同的 groupBy + collect_list 组合是否在其中一个值上失败,验证了它不是分区上的第一个或最后一个 id,它的行为不同,所以它似乎是随机的。

如果你愿意,你可以检查我的代码,它非常简单。


    ordered_df = df_parquet.repartition(500, 
    'id').sortWithinPartitions(['id', 'date'])

    grouped_df =  ordered_df.groupby("id").agg(F.collect_list("date").alias('date'), 
    F.collect_list('consumption').alias('consumption'))

并且代码用于测试它(比较第一个值和最后一个值,第一个值应该更旧,但在 500 例中不是):


    test = grouped_df.filter(F.size('date') > 
    1).select('id', (F.col('date').getItem(0) > 
    F.col('date').getItem(F.size('date') - 1)).alias('test'), 
    F.array([F.col('fecha').getItem(0), 
                      F.col('date').getItem(F.size('date') - 
    1)]).alias('see')).filter(F.col('test'))

    test.show(5, 100)

    test.count()

结果:

+-----+----+------------------------------------------+
|   id|test|                                       see|
+-----+----+------------------------------------------+
|89727|true|[2017-02-10 00:00:00, 2017-02-09 00:00:00]|
|76325|true|[2017-02-10 00:00:00, 2017-02-09 00:00:00]|
|80115|true|[2017-02-10 00:00:00, 2017-02-09 00:00:00]|
|89781|true|[2017-02-10 00:00:00, 2017-02-09 00:00:00]|
|76411|true|[2017-02-10 00:00:00, 2017-02-09 00:00:00]|
+-----+----+------------------------------------------+
only showing top 5 rows

500

虽然它预计是一个空数据框,因为所有数组都应该针对所有 id 进行排序。

标签: pythonapache-sparkgroup-bypysparkpartitioning

解决方案


好的,问题仍未解决,但我找到了一个简单的解决方法,以防有人因同样的问题而陷入困境:

关键是反转数组的第一个和最后一个位置。在日期数组上,这可以通过使用spark 2.4 中引入的array_sort函数进行排序来完成。要对消费数组执行重新排序,我们需要使用 udf。

invert_last = F.udf(lambda vector: [vector[-1]] + vector[:-1], ArrayType(DoubleType()))

test = grouped_df.withColumn('error', (F.size('date') > 1) & (F.col('date').getItem(0) >
           F.col('date').getItem(F.size('date') - 1))).withColumn('date', F.when(F.col('error'),
           F.array_sort(F.col('date'))).otherwise(F.col('date'))).withColumn('consumption',
           F.when(F.col('error'), invert_last(F.col('consumption'))).otherwise(F.col('consumption'))).drop('error')

干杯。


推荐阅读