apache-spark - Spark加入两个大表并存储在Hive中
问题描述
我正在尝试在 spark 中加入两个大表,最后将其存储回 Hive。我们可以在 Hive 本身中完成,但这需要大约 1 小时,因此我们正在尝试查看 spark 是否可以提供更好的性能。
下面是代码片段。
val m_d = hiveContext.sql("select cp,c,s,t,d,a,tr,n from m_d").repartition(2000,$"c",$"s",$"t",$"d",$"a")
//-- count 60321929
val m_e = hiveContext.sql("select cp,c,s,t,d,a,tr,n from m_e").repartition(2000,$"c",$"s",$"t",$"d",$"a")
//-- count 268135916
m_d.registerTempTable( "m_d" )
m_e.registerTempTable( "m_e" )
val joined_df = m_d.join(m_e, m_d("c") === m_e("c")
&& m_d("s") === m_e("s") && m_d("t") === m_e("t")
&& m_d("d") === m_e("d") && m_d("a") === m_e("a")
)
joined_df.registerTempTable( "joined_df" )
joined_df.cache()
joined_df.first()
joined_df.show()
显示工作正常,但即使我运行
joined_df.count
或者
hiveContext.sql("create table joined_df as select * from joined_df");
或者
joined_df.rdd.mapPartitionsWithIndex{case (i,rows) => Iterator((i,rows.size))}.toDF("partition_number","number_of_records").collect.foreach(println)
我得到以下错误。
java.lang.IllegalArgumentException:大小超过 Integer.MAX_VALUE
当我用谷歌搜索错误时,似乎其中一个分区大小超过了 20gb 左右。因此我收到此错误。但我认为加入中使用的列对于加入来说非常独特,应该扭曲数据。此外,我将分区更改为 2000 以确保数据均匀分布。对于这个阶段,我得到了 Shuffle Read - 15.9 GB。输入 - 59.7 MB。
非常感谢任何改进连接或克服错误的建议。请注意,我没有尝试过广播连接,因为我虽然两个数据都很大。
解决方案
推荐阅读
- argocd - argocd 的默认密码是多少?
- c# - 这个注释有什么不同吗[DataType(DataType.Text)]
- android - Android中的AES加密/解密
- java - 无法保存主要的 Jenkins 配置?
- angular - 通过使用 ng-packagr 向 ng-package.json 添加“资产”,全局样式未应用于 Angular 库
- wso2 - 用户帐户暂停在 WSO2 身份服务器 5.10.0 中不起作用
- python - 导入 SpacyTextBlob 显示 Attributeerror
- php - 3次错误登录尝试后如何在laravel上再次启用锁定用户
- javascript - 如何从文本框中的函数分配保存的更新值
- nginx - Nginx中的000和499错误码有什么区别