scala - 加入大数据帧和小数据帧时的广播数据帧和过滤器
问题描述
我有一个带有列的大数据框user_id
,user_address
以及多个与用户相关的列(多 10-12 个)和一个带有列user_id
和user_contact
. 每个用户可以有很多电话号码/电子邮件地址。
我正在尝试广播较小的 Dataframe 并将其数据传递给 UDF。但是,我无法获取每个执行程序中存在的所有数据。No match found
如果未找到该 user_id 的详细联系信息,则 UDF 返回。
我尝试按 过滤广播DF user_id
,但似乎这不起作用。有人可以在这里指导我吗?
val DF = createDF("/somePath/")
val broadcastedDF = spark.sparkContext.broadcast(DF)
val DFWithUDF = someDF.select( col("user_id),
UDF(col("user_name"),
typedLit[Seq[String]](broadcastedDF.value.filter(broadcastedDF.value("user_id") === col("user_id")).
select("user_contact").
collect().map(data =>
String.valueOf(data.getAs[String]("user_contact")))
)
).alias(SCHEMA_REQUEST_TARGET)
解决方案
您可以在加入这两个数据帧时使用加入提示来广播较小的数据帧,无需自己专门广播。您的代码可能如下所示:
val bigDf = ... // has columns user_id, user_address
val smallDf = ... // has columns user_id and user_contact
// define your join expression and join those tables using column user_id
import org.apache.spark.sql.functions.broadcast
val joinExpr = bigDf.col("user_id") === smallDf.col("user_id")
val joinedDF = bigDf.join(broadcast(smallDf), joinExpr)
如果您的小型 Dataframe 小于 10MB 并且您没有更改配置spark.sql.autoBroadcastJoinThreshold
(默认为 10MB),您甚至不需要在加入时提及广播提示。
配置spark.sql.autoBroadcastJoinThreshold
描述为:
“配置在执行连接时将广播到所有工作节点的表的最大大小(以字节为单位)。通过将此值设置为 -1 可以禁用广播。请注意,当前仅支持使用命令 ANALYZE 的 Hive Metastore 表的统计信息表计算统计 noscan 已运行,并且基于文件的数据源表直接在数据文件上计算统计信息。”
推荐阅读
- c++ - 如何使 sf::Vector2f transform(float t) 速度更快?
- linux - 无法使用 linux 终端删除 linux 目录
- swift - Swift:“'Any'类型的值没有成员'map'”,数组返回为Any类型
- css - 为什么插入符号会触发 css lint 错误?
- python - Python:检查列表中的每个文件是否存在于目录中
- python - Unstacked DataFrame太大,导致int32溢出
- java - 从图库意图中选择后,图像未显示在图像视图中
- identityserver4 - 如何从 Identity Server 应用程序执行外部身份验证请求?
- computer-vision - CVAT coco 注释 json - iscrowd 选项
- amazon-web-services - Codebuild 失败时使用 Cloudformation 模板发送通知邮件