首页 > 解决方案 > 加入大数据帧和小数据帧时的广播数据帧和过滤器

问题描述

我有一个带有列的大数据框user_iduser_address以及多个与用户相关的列(多 10-12 个)和一个带有列user_iduser_contact. 每个用户可以有很多电话号码/电子邮件地址。

我正在尝试广播较小的 Dataframe 并将其数据传递给 UDF。但是,我无法获取每个执行程序中存在的所有数据。No match found如果未找到该 user_id 的详细联系信息,则 UDF 返回。

我尝试按 过滤广播DF user_id,但似乎这不起作用。有人可以在这里指导我吗?

val DF = createDF("/somePath/")
val broadcastedDF = spark.sparkContext.broadcast(DF)
val DFWithUDF = someDF.select( col("user_id),
                               UDF(col("user_name"),
                               typedLit[Seq[String]](broadcastedDF.value.filter(broadcastedDF.value("user_id") === col("user_id")).
                                                        select("user_contact").
                                                        collect().map(data => 
                                                           String.valueOf(data.getAs[String]("user_contact")))
                                                      )
                                                    ).alias(SCHEMA_REQUEST_TARGET)

标签: scalaapache-sparkapache-spark-sql

解决方案


您可以在加入这两个数据帧时使用加入提示来广播较小的数据帧,无需自己专门广播。您的代码可能如下所示:

val bigDf = ... // has columns user_id, user_address
val smallDf = ... // has columns user_id and user_contact

// define your join expression and join those tables using column user_id
import org.apache.spark.sql.functions.broadcast
val joinExpr = bigDf.col("user_id") === smallDf.col("user_id")
val joinedDF = bigDf.join(broadcast(smallDf), joinExpr)

如果您的小型 Dataframe 小于 10MB 并且您没有更改配置spark.sql.autoBroadcastJoinThreshold(默认为 10MB),您甚至不需要在加入时提及广播提示。

配置spark.sql.autoBroadcastJoinThreshold描述为:

“配置在执行连接时将广播到所有工作节点的表的最大大小(以字节为单位)。通过将此值设置为 -1 可以禁用广播。请注意,当前仅支持使用命令 ANALYZE 的 Hive Metastore 表的统计信息表计算统计 noscan 已运行,并且基于文件的数据源表直接在数据文件上计算统计信息。”


推荐阅读