首页 > 解决方案 > Spark中两个大型数据集之间的交叉连接

问题描述

我有 2 个大型数据集。第一个数据集包含大约 1.3 亿个条目。
第二个数据集包含大约 40000 个条目。数据是从 MySQL 表中获取的。

我需要做一个交叉加入,但我得到

java.sql.SQLException: GC overhead limit exceeded

在 Scala 中执行此操作的最佳最佳技术是什么?

以下是我的代码片段:

val df1 = (spark.read.jdbc(jdbcURL,configurationLoader.mysql_table1,"id",100,100000,40, MySqlConnection.getConnectionProperties))
val df2 = (spark.read.jdbc(jdbcURL,configurationLoader.mysql_table2, MySqlConnection.getConnectionProperties))
val df2Cache = df2.repartition(40).cache()
val crossProduct = df1.join(df2Cache)

df1 是较大的数据集,而 df2 是较小的数据集。

标签: scalaapache-sparkapache-spark-sql

解决方案


130M*40K = 52 万亿条记录是存储这些数据所需的 52TB 内存,如果我们假设每条记录是 1 个字节,这肯定不是真的。如果它高达 64 字节(我认为这也是一个非常保守的估计),那么您需要 3.32 PB (!) 的内存来存储数据。这是一个非常大的数量,因此除非您有一个非常大的集群并且该集群内的网络非常快,否则您可能需要重新考虑您的算法以使其工作。

话虽如此,当您执行join两个 SQL 数据集/数据帧中的一个时,Spark 用于存储连接结果的分区数由spark.sql.shuffle.partitions属性控制(请参见此处)。您可能希望将其设置为一个非常大的数字,并将执行者的数量设置为您可以设置的最大数量。然后,您也许可以将您的处理运行到最后。

此外,您可能需要研究该spark.shuffle.minNumPartitionsToHighlyCompress选项;如果您将其设置为少于随机分区数,您可能会获得另一个内存提升。请注意,在最近的 Spark 版本之前,此选项是硬编码常量设置为 2000,因此根据您的环境,您只需设置spark.sql.shuffle.partitions为大于 2000 的数字即可使用它。


推荐阅读