首页 > 解决方案 > Spark加入两个大表并存储在Hive中

问题描述

我正在尝试在 spark 中加入两个大表,最后将其存储回 Hive。我们可以在 Hive 本身中完成,但这需要大约 1 小时,因此我们正在尝试查看 spark 是否可以提供更好的性能。

下面是代码片段。

val m_d = hiveContext.sql("select cp,c,s,t,d,a,tr,n from m_d").repartition(2000,$"c",$"s",$"t",$"d",$"a")
//-- count 60321929
val m_e = hiveContext.sql("select cp,c,s,t,d,a,tr,n from m_e").repartition(2000,$"c",$"s",$"t",$"d",$"a")
//-- count 268135916

m_d.registerTempTable( "m_d" )
m_e.registerTempTable( "m_e" )


val joined_df = m_d.join(m_e, m_d("c") === m_e("c") 
&& m_d("s") === m_e("s") && m_d("t") === m_e("t")
&& m_d("d") === m_e("d") && m_d("a") === m_e("a") 
)

joined_df.registerTempTable( "joined_df" )
joined_df.cache()
joined_df.first()
joined_df.show()

显示工作正常,但即使我运行

joined_df.count 

或者

hiveContext.sql("create table joined_df as select * from joined_df");

或者

joined_df.rdd.mapPartitionsWithIndex{case (i,rows) => Iterator((i,rows.size))}.toDF("partition_number","number_of_records").collect.foreach(println)

我得到以下错误。

java.lang.IllegalArgumentException:大小超过 Integer.MAX_VALUE

当我用谷歌搜索错误时,似乎其中一个分区大小超过了 20gb 左右。因此我收到此错误。但我认为加入中使用的列对于加入来说非常独特,应该扭曲数据。此外,我将分区更改为 2000 以确保数据均匀分布。对于这个阶段,我得到了 Shuffle Read - 15.9 GB。输入 - 59.7 MB。

非常感谢任何改进连接或克服错误的建议。请注意,我没有尝试过广播连接,因为我虽然两个数据都很大。

标签: apache-sparkjoin

解决方案


推荐阅读