python - Spark(pySpark)组通过错误排序collect_list上的第一个元素
问题描述
我有以下数据框(df_parquet):
DataFrame[id: bigint, date: timestamp, consumption: decimal(38,18)]
我打算使用collect_list获取日期和消费的排序列表,正如这篇文章中所述:collect_list by preserve order based on another variable
我正在遵循最后一种方法(https://stackoverflow.com/a/49246162/11841618),这是我认为更有效的方法。
因此,我不只是使用默认分区数(200)调用 repartition,而是使用 500 调用它,并且我按 id 和日期在分区内排序,而不仅仅是按日期(为了使 groupBy 更有效率,或者我希望如此) . 问题是每个分区一次(每个分区只有一个 id,而且它似乎是一个随机 id)我在最后一个位置获得列表的第一项。
关于发生了什么的任何线索?其余的 id 在其数组中排序良好,所以我认为 groupBy 或 collect_list 在每个分区内的行为方式存在一些问题。
我通过获取分区 id 并检查相同的 groupBy + collect_list 组合是否在其中一个值上失败,验证了它不是分区上的第一个或最后一个 id,它的行为不同,所以它似乎是随机的。
如果你愿意,你可以检查我的代码,它非常简单。
ordered_df = df_parquet.repartition(500,
'id').sortWithinPartitions(['id', 'date'])
grouped_df = ordered_df.groupby("id").agg(F.collect_list("date").alias('date'),
F.collect_list('consumption').alias('consumption'))
并且代码用于测试它(比较第一个值和最后一个值,第一个值应该更旧,但在 500 例中不是):
test = grouped_df.filter(F.size('date') >
1).select('id', (F.col('date').getItem(0) >
F.col('date').getItem(F.size('date') - 1)).alias('test'),
F.array([F.col('fecha').getItem(0),
F.col('date').getItem(F.size('date') -
1)]).alias('see')).filter(F.col('test'))
test.show(5, 100)
test.count()
结果:
+-----+----+------------------------------------------+
| id|test| see|
+-----+----+------------------------------------------+
|89727|true|[2017-02-10 00:00:00, 2017-02-09 00:00:00]|
|76325|true|[2017-02-10 00:00:00, 2017-02-09 00:00:00]|
|80115|true|[2017-02-10 00:00:00, 2017-02-09 00:00:00]|
|89781|true|[2017-02-10 00:00:00, 2017-02-09 00:00:00]|
|76411|true|[2017-02-10 00:00:00, 2017-02-09 00:00:00]|
+-----+----+------------------------------------------+
only showing top 5 rows
500
虽然它预计是一个空数据框,因为所有数组都应该针对所有 id 进行排序。
解决方案
好的,问题仍未解决,但我找到了一个简单的解决方法,以防有人因同样的问题而陷入困境:
关键是反转数组的第一个和最后一个位置。在日期数组上,这可以通过使用spark 2.4 中引入的array_sort函数进行排序来完成。要对消费数组执行重新排序,我们需要使用 udf。
invert_last = F.udf(lambda vector: [vector[-1]] + vector[:-1], ArrayType(DoubleType()))
test = grouped_df.withColumn('error', (F.size('date') > 1) & (F.col('date').getItem(0) >
F.col('date').getItem(F.size('date') - 1))).withColumn('date', F.when(F.col('error'),
F.array_sort(F.col('date'))).otherwise(F.col('date'))).withColumn('consumption',
F.when(F.col('error'), invert_last(F.col('consumption'))).otherwise(F.col('consumption'))).drop('error')
干杯。
推荐阅读
- python - mysql.connector 未检测到数据的外部更改
- c# - ASP.NET Core 2.1 - 实现 MemoryCache 时出错
- regex - 正则表达式在第一个双引号处停止
- reactjs - 使用laravel和react时如何做SSR?
- hadoop - 如何创建以半列分隔并以逗号作为小数点的分区表?
- assembly - 从 EAs FIFA 19 读取分数
- sequelize.js - Sequelizer 返回更新前的数据
- mysql - 如何获取列值与MySql中选择查询的行匹配的表的所有行
- arkit - 为什么 ARKit 将记录的世界地图与当前环境协调后,锚点的位置不会改变?
- javascript - iframe 内的弹出窗口不显示在父页面上