scala - 根据列中的最大值过滤火花数据帧中的行的最有效方法
问题描述
我有一个名为 flightData2015 的火花数据框,格式如下:
+--------------------------+---------------------+-------+
| Destination_country_name | Origin_country_name | count |
+--------------------------+---------------------+-------+
| United States | Romania | 15 |
| United States | Croatia | 1 |
| United States | Ireland | 15 |
| Egypt | United States | 10 |
+--------------------------+---------------------+-------+
我想获取所有具有最大计数的行。所以在上面的例子中我会得到结果:
+--------------------------+---------------------+-------+
| Destination_country_name | Origin_country_name | count |
+--------------------------+---------------------+-------+
| United States | Romania | 15 |
| United States | Ireland | 15 |
+--------------------------+---------------------+-------+
我可以通过 SparkSQL 执行此操作,如下所示:
spark.sql("select * from flight_data_2015 where count = (select max(count) from flight_data_2015)")
但是,正如预期的那样,当我检查执行计划时,我发现数据集上有多次传递。
== Physical Plan ==
*(1) Project [DEST_COUNTRY_NAME#10, ORIGIN_COUNTRY_NAME#11, count#12]
+- *(1) Filter (isnotnull(count#12) && (count#12 = Subquery subquery209))
: +- Subquery subquery209
: +- *(2) HashAggregate(keys=[], functions=[max(count#12)])
: +- Exchange SinglePartition
: +- *(1) HashAggregate(keys=[], functions=[partial_max(count#12)])
: +- *(1) FileScan csv [count#12] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/utk/Documents/Spark-The-Definitive-Guide/data/flight-data/csv/2..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<count:int>
+- *(1) FileScan csv [DEST_COUNTRY_NAME#10,ORIGIN_COUNTRY_NAME#11,count#12] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/utk/Documents/Spark-The-Definitive-Guide/data/flight-data/csv/2..., PartitionFilters: [], PushedFilters: [IsNotNull(count)], ReadSchema: struct<DEST_COUNTRY_NAME:string,ORIGIN_COUNTRY_NAME:string,count:int>
+- Subquery subquery209
+- *(2) HashAggregate(keys=[], functions=[max(count#12)])
+- Exchange SinglePartition
+- *(1) HashAggregate(keys=[], functions=[partial_max(count#12)])
+- *(1) FileScan csv [count#12] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/utk/Documents/Spark-The-Definitive-Guide/data/flight-data/csv/2..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<count:int>
我想知道是否有一种方法可以一次性完成。如果不是,那么使用和不使用 SparkSQL 的最佳方法是什么。
另外请记住,数据框实际上有超过 20 亿行,因此将所有内容都转移到一个分区是不可能的。
解决方案
推荐阅读
- python - matplotlib 中是否有任何行为类似于 alpha 但相反?
- vb.net - 将具有 T 属性的类与 json 转换
- javascript - 在单击 d3 js 条形图上滚动条
- java - 运行插入样本时出现问题
- angular - 在 angualr 6 中使用 sharedObject 服务的最佳方式是什么?
- dart - 在选择和自定义瓷砖高度时更改 ListTile 的背景颜色
- android - 调用 HttpUrlConnection.getResponseCode() 时 IntentService 冻结
- javascript - ESLint 警告(应该使用非回调方法) - 如何处理?
- amazon-s3 - Vagrant:来自 S3 的下载框(使用 MFA)
- c# - RetrieveEntityChangesRequest 附件