scala - 在火花数据框中聚合期间过滤数组值
问题描述
我正在对以下数据框进行聚合,以获取具有品牌数组的广告商列表
+------------+------+
|advertiser |brand |
+------------+------+
|Advertiser 1|Brand1|
|Advertiser 1|Brand2|
|Advertiser 2|Brand3|
|Advertiser 2|Brand4|
|Advertiser 3|Brand5|
|Advertiser 3|Brand6|
+------------+------+
这是我的代码:
import org.apache.spark.sql.functions.collect_list
df2
.groupBy("advertiser")
.agg(collect_list("brand").as("brands"))
这给了我以下数据框:
+------------+----------------+
|advertiser |brands |
+------------+----------------+
|Advertiser 1|[Brand1, Brand2]|
|Advertiser 2|[Brand3, Brand4]|
|Advertiser 3|[Brand5, Brand6]|
+------------+----------------+
在聚合期间,我想使用下表过滤品牌列表:
+------+------------+
|brand |brand name |
+------+------------+
|Brand1|Brand_name_1|
|Brand3|Brand_name_3|
+------+------------+
为了要达到:
+------------+--------+
|advertiser |brands |
+------------+--------+
|Advertiser 1|[Brand1]|
|Advertiser 2|[Brand3]|
|Advertiser 3|null |
+------------+--------+
解决方案
对于您的问题,我看到了两个解决方案,我将其称为收集解决方案和加入解决方案
收集解决方案
如果您可以收集brands
数据框,则可以使用此收集的集合在执行时仅保留正确的品牌collect_list
,然后flatten
将数组替换为空数组,null
如下所示:
import org.apache.spark.sql.functions.{array, col, collect_list, flatten, size, when}
val filteredBrands = brands.select("brand").collect().map(_.getString(0))
val finalDataframe = df2
.groupBy("advertiser")
.agg(collect_list(when(col("brand").isin(filteredBrands: _*), array(col("brand"))).otherwise(array())).as("brands"))
.withColumn("brands", flatten(col("brands")))
.withColumn("brands", when(size(col("brands")).equalTo(0), null).otherwise(col("brands")))
加入解决方案
如果您的brands
数据框不适合内存,如果品牌在数据框中,您可以先左加入df2
以brands
拥有一个包含品牌的列brands
,否则null
,然后按您的分组,最后替换空数组,因为广告商没有您想要的品牌筛选条件null
:
import org.apache.spark.sql.functions.{col, collect_list}
val finalDataframe = df2
.join(brands.select(col("brand").as("filtered_brand")), col("filtered_brand") === col("brand"), "left_outer")
.groupBy("advertiser").agg(collect_list(col("filtered_brand")).as("brands"))
.withColumn("brands", when(size(col("brands")).equalTo(0), null).otherwise(col("brands")))
细节
因此,如果我们从df2
如下数据框开始:
+------------+------+
|advertiser |brand |
+------------+------+
|Advertiser 1|Brand1|
|Advertiser 1|Brand2|
|Advertiser 2|Brand3|
|Advertiser 2|Brand4|
|Advertiser 3|Brand5|
|Advertiser 3|Brand6|
+------------+------+
brands
数据框如下:
+------+------------+
|brand |brand name |
+------+------------+
|Brand1|Brand_name_1|
|Brand3|Brand_name_3|
+------+------------+
df2
在和数据框之间的第一个左外连接brands
(第一行)之后,您将获得以下数据框:
+------------+------+--------------+
|advertiser |brand |filtered_brand|
+------------+------+--------------+
|Advertiser 1|Brand1|Brand1 |
|Advertiser 1|Brand2|null |
|Advertiser 2|Brand3|Brand3 |
|Advertiser 2|Brand4|null |
|Advertiser 3|Brand5|null |
|Advertiser 3|Brand6|null |
+------------+------+--------------+
当您按广告商对该数据框进行分组时,收集过滤品牌列表,您将获得以下数据框:
+------------+--------+
|advertiser |brands |
+------------+--------+
|Advertiser 2|[Brand3]|
|Advertiser 3|[] |
|Advertiser 1|[Brand1]|
+------------+--------+
最后,当您应用最后一行将空数组替换为 null 时,您会得到预期的结果:
+------------+--------+
|advertiser |brands |
+------------+--------+
|Advertiser 2|[Brand3]|
|Advertiser 3|null |
|Advertiser 1|[Brand1]|
+------------+--------+
结论
Collect Solution creates only one expensive suffle step (during the groupBy), and should be chosen in priority if your brands
dataframe is small. Join solution works if your brands
dataframe is big, but it creates lot of expensive suffle steps, with one groupBy and one join.
推荐阅读
- python - Sqlalchemy 查询/过滤器以检查 INSERT Python 上的重复项
- c++ - 如何在 C++ 中实现确定性有限自动机类
- python - For循环没有遍历列表中的所有值?
- json - jq:一个 jq 命令来更新 Json 文件中的多个值
- ssis - SSIS派生列表达式 - 删除前导零
- java - 如何在没有显式代码的情况下在 mule esb 3.9 的 inboud-endpoint 上收听消息的到达?
- asp.net - 在 asp.net mvc 的单元测试中,对象引用未设置为对象问题的实例
- c# - 在 UWP 中禁用游戏手柄导航
- mongodb - MongoDB findOneAndUpdate 写入冲突显着影响性能
- python - Tensorflow/Keras 模型无视训练说明?