首页 > 解决方案 > 在火花数据框中聚合期间过滤数组值

问题描述

我正在对以下数据框进行聚合,以获取具有品牌数组的广告商列表

+------------+------+
|advertiser  |brand |
+------------+------+
|Advertiser 1|Brand1|
|Advertiser 1|Brand2|
|Advertiser 2|Brand3|
|Advertiser 2|Brand4|
|Advertiser 3|Brand5|
|Advertiser 3|Brand6|
+------------+------+

这是我的代码:

import org.apache.spark.sql.functions.collect_list

df2
  .groupBy("advertiser")
  .agg(collect_list("brand").as("brands"))

这给了我以下数据框:

+------------+----------------+
|advertiser  |brands          |
+------------+----------------+
|Advertiser 1|[Brand1, Brand2]|
|Advertiser 2|[Brand3, Brand4]|
|Advertiser 3|[Brand5, Brand6]|
+------------+----------------+

在聚合期间,我想使用下表过滤品牌列表:

+------+------------+
|brand |brand name  |
+------+------------+
|Brand1|Brand_name_1|
|Brand3|Brand_name_3|
+------+------------+

为了要达到:

+------------+--------+
|advertiser  |brands  |
+------------+--------+
|Advertiser 1|[Brand1]|
|Advertiser 2|[Brand3]|
|Advertiser 3|null    |
+------------+--------+

标签: scalaapache-sparkpysparkapache-spark-sqlscala-collections

解决方案


对于您的问题,我看到了两个解决方案,我将其称为收集解决方案加入解决方案

收集解决方案

如果您可以收集brands数据框,则可以使用此收集的集合在执行时仅保留正确的品牌collect_list,然后flatten将数组替换为空数组,null如下所示:

import org.apache.spark.sql.functions.{array, col, collect_list, flatten, size, when}

val filteredBrands = brands.select("brand").collect().map(_.getString(0))

val finalDataframe = df2
  .groupBy("advertiser")
  .agg(collect_list(when(col("brand").isin(filteredBrands: _*), array(col("brand"))).otherwise(array())).as("brands"))
  .withColumn("brands", flatten(col("brands")))
  .withColumn("brands", when(size(col("brands")).equalTo(0), null).otherwise(col("brands")))

加入解决方案

如果您的brands数据框不适合内存,如果品牌在数据框中,您可以先左加入df2brands拥有一个包含品牌的列brands,否则null,然后按您的分组,最后替换空数组,因为广告商没有您想要的品牌筛选条件null

import org.apache.spark.sql.functions.{col, collect_list}

val finalDataframe = df2
  .join(brands.select(col("brand").as("filtered_brand")), col("filtered_brand") === col("brand"), "left_outer")
  .groupBy("advertiser").agg(collect_list(col("filtered_brand")).as("brands"))
  .withColumn("brands", when(size(col("brands")).equalTo(0), null).otherwise(col("brands")))

细节

因此,如果我们从df2如下数据框开始:

+------------+------+
|advertiser  |brand |
+------------+------+
|Advertiser 1|Brand1|
|Advertiser 1|Brand2|
|Advertiser 2|Brand3|
|Advertiser 2|Brand4|
|Advertiser 3|Brand5|
|Advertiser 3|Brand6|
+------------+------+

brands数据框如下:

+------+------------+
|brand |brand name  |
+------+------------+
|Brand1|Brand_name_1|
|Brand3|Brand_name_3|
+------+------------+

df2在和数据框之间的第一个左外连接brands(第一行)之后,您将获得以下数据框:

+------------+------+--------------+
|advertiser  |brand |filtered_brand|
+------------+------+--------------+
|Advertiser 1|Brand1|Brand1        |
|Advertiser 1|Brand2|null          |
|Advertiser 2|Brand3|Brand3        |
|Advertiser 2|Brand4|null          |
|Advertiser 3|Brand5|null          |
|Advertiser 3|Brand6|null          |
+------------+------+--------------+

当您按广告商对该数据框进行分组时,收集过滤品牌列表,您将获得以下数据框:

+------------+--------+
|advertiser  |brands  |
+------------+--------+
|Advertiser 2|[Brand3]|
|Advertiser 3|[]      |
|Advertiser 1|[Brand1]|
+------------+--------+

最后,当您应用最后一行将空数组替换为 null 时,您会得到预期的结果:

+------------+--------+
|advertiser  |brands  |
+------------+--------+
|Advertiser 2|[Brand3]|
|Advertiser 3|null    |
|Advertiser 1|[Brand1]|
+------------+--------+

结论

Collect Solution creates only one expensive suffle step (during the groupBy), and should be chosen in priority if your brands dataframe is small. Join solution works if your brands dataframe is big, but it creates lot of expensive suffle steps, with one groupBy and one join.


推荐阅读