scala - Spark中两个大型数据集之间的交叉连接
问题描述
我有 2 个大型数据集。第一个数据集包含大约 1.3 亿个条目。
第二个数据集包含大约 40000 个条目。数据是从 MySQL 表中获取的。
我需要做一个交叉加入,但我得到
java.sql.SQLException: GC overhead limit exceeded
在 Scala 中执行此操作的最佳最佳技术是什么?
以下是我的代码片段:
val df1 = (spark.read.jdbc(jdbcURL,configurationLoader.mysql_table1,"id",100,100000,40, MySqlConnection.getConnectionProperties))
val df2 = (spark.read.jdbc(jdbcURL,configurationLoader.mysql_table2, MySqlConnection.getConnectionProperties))
val df2Cache = df2.repartition(40).cache()
val crossProduct = df1.join(df2Cache)
df1 是较大的数据集,而 df2 是较小的数据集。
解决方案
130M*40K = 52 万亿条记录是存储这些数据所需的 52TB 内存,如果我们假设每条记录是 1 个字节,这肯定不是真的。如果它高达 64 字节(我认为这也是一个非常保守的估计),那么您需要 3.32 PB (!) 的内存来存储数据。这是一个非常大的数量,因此除非您有一个非常大的集群并且该集群内的网络非常快,否则您可能需要重新考虑您的算法以使其工作。
话虽如此,当您执行join
两个 SQL 数据集/数据帧中的一个时,Spark 用于存储连接结果的分区数由spark.sql.shuffle.partitions
属性控制(请参见此处)。您可能希望将其设置为一个非常大的数字,并将执行者的数量设置为您可以设置的最大数量。然后,您也许可以将您的处理运行到最后。
此外,您可能需要研究该spark.shuffle.minNumPartitionsToHighlyCompress
选项;如果您将其设置为少于随机分区数,您可能会获得另一个内存提升。请注意,在最近的 Spark 版本之前,此选项是硬编码常量设置为 2000,因此根据您的环境,您只需设置spark.sql.shuffle.partitions
为大于 2000 的数字即可使用它。
推荐阅读
- javascript - 在网页内导航而不附加 #href(浏览器后退按钮)
- php - 在使用数据库作为队列连接的 laravel 作业中。但是 queue:work 命令不触发队列?
- r - 将函数应用于特定行
- java - 码头:非法状态异常 - 没有 servlet 的多部分配置
- android - 有没有办法在 Android 中只收听音频音量变化?
- tarantool - 是否可以同时将数据保存在乙烯基和 memtx 中?
- python-3.x - 简单的 bot 命令在 discord.py 中不起作用
- r - 当已经使用 col.region 时,有没有办法在 lattice 包的 levelplot 函数中自定义颜色?
- flutter - Flutter Geolocator Location 不适用于真实手机
- treeview - Forge Viewer v7* 中的 SELECTION_CHANGED_EVENT 问题