首页 > 解决方案 > Spark 结果大小太大 - 用例

问题描述

我的输入数据存储在 Cassandra 中,我使用主键为年、月、日、小时的表作为 Spark 聚合的来源。我的 Spark 应用程序可以

  1. 连接两个表
  2. 获取连接表并按小时选择数据
  3. 按小时联合选择的块
  4. 对结果数据集进行聚合并保存到 Cassandra

简化

val ds1 = spark.read.cassandraFormat(table1, keyspace).load().as[T]
val ds2 = spark.read.cassandraFormat(table2, keyspace).load().as[T]
val dsInput = ds1.join(ds2).coalesce(150)
val dsUnion = for (x <- hours) yield dsInput.select( where hour = x) 
val dsResult = mySparkAggregation( dsUnion.reduce(_.union(_)).coalesce(10) )
dsResult.saveToCassadnra

`

结果图如下所示(3 小时/工会)

在此处输入图像描述

当我只做几个工会时一切正常,例如 24 个(一天),但是当我开始运行 Spark 作业 1 个月(720 个工会)时,我开始收到这样的错误

1126 个任务的序列化结果总大小 (1024.8 MB) 大于 spark.driver.maxResultSize (1024.0 MB)

另一个令人担忧的事情是,该作业创建了大约 10 万个任务,其中一个阶段(导致上述错误的那个阶段)包含 74400 个任务,当它处理 1125 个任务时,由于 maxResultSize 而崩溃。更重要的是,它似乎必须每小时对数据进行洗牌(联合)。

我试图在联合后合并任务的数量——而不是说任务太大。

我将非常感谢任何帮助,建议?我感觉我做错了什么。


我做了一些调查并得出了一些结论假设我们有两张桌子

cb.people CREATE TABLE cb.people ( id text PRIMARY KEY, name text )

和 cb.address CREATE TABLE cb.address (people_id text PRIMARY KEY, name text)

有以下数据

cassandra@cqlsh> select * from cb.people;

 id | name
----+---------
  3 | Mariusz
  2 |  Monica
  1 |    John



cassandra@cqlsh> select * from cb.address;

 people_id | name
-----------+--------
         3 | POLAND
         2 |    USA
         1 |    USA

现在我想获得 id 1 和 2 的连接结果。有两种可能的解决方案。

  1. 联合二从表people中选择id 1和2,然后与地址表join

    scala> val people =  spark.read.cassandraFormat("people", "cb").load()
    scala>  val usPeople = people.where(col("id") === "1") union people.where(col("id") === "2")
    scala> val address =  spark.read.cassandraFormat("address", "cb").load()
    scala> val joined = usPeople.join(address, address.col("people_id") === usPeople.col("id"))
    
  2. 加入两个表,然后为 id 1 和 2 联合两个选择

    scala> val peopleAddress = address.join(usPeople, address.col("people_id") === usPeople.col("id"))
    scala> val joined2 = peopleAddress.where(col("id") === "1") union peopleAddress.where(col("id") === "2")
    

两者都返回相同的结果

+---------+----+---+------+                                                     
|people_id|name| id|  name|
+---------+----+---+------+
|        1| USA|  1|  John|
|        2| USA|  2|Monica|
+---------+----+---+------+

但是看解释我可以看到很大的不同

scala> joined.explain
== Physical Plan ==
*SortMergeJoin [people_id#10], [id#0], Inner
:- *Sort [people_id#10 ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(people_id#10, 200)
:     +- *Filter (((people_id#10 = 1) || (people_id#10 = 2)) && isnotnull(people_id#10))
:        +- *Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@3077e4aa [people_id#10,name#11] PushedFilters: [Or(EqualTo(people_id,1),EqualTo(people_id,2)), IsNotNull(people_id)], ReadSchema: struct<people_id:string,name:string>
+- *Sort [id#0 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(id#0, 200)
      +- Union
         :- *Filter isnotnull(id#0)
         :  +- *Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@6846e4e8 [id#0,name#1] PushedFilters: [IsNotNull(id), *EqualTo(id,1)], ReadSchema: struct<id:string,name:string>
         +- *Filter isnotnull(id#0)
            +- *Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@6846e4e8 [id#0,name#1] PushedFilters: [IsNotNull(id), *EqualTo(id,2)], ReadSchema: struct<id:string,name:string>

scala> joined2.explain
== Physical Plan ==
Union
:- *SortMergeJoin [people_id#10], [id#0], Inner
:  :- *Sort [people_id#10 ASC NULLS FIRST], false, 0
:  :  +- Exchange hashpartitioning(people_id#10, 200)
:  :     +- *Filter isnotnull(people_id#10)
:  :        +- *Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@3077e4aa [people_id#10,name#11] PushedFilters: [*EqualTo(people_id,1), IsNotNull(people_id)], ReadSchema: struct<people_id:string,name:string>
:  +- *Sort [id#0 ASC NULLS FIRST], false, 0
:     +- Exchange hashpartitioning(id#0, 200)
:        +- Union
:           :- *Filter isnotnull(id#0)
:           :  +- *Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@6846e4e8 [id#0,name#1] PushedFilters: [IsNotNull(id), *EqualTo(id,1)], ReadSchema: struct<id:string,name:string>
:           +- *Filter (isnotnull(id#0) && (id#0 = 1))
:              +- *Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@6846e4e8 [id#0,name#1] PushedFilters: [IsNotNull(id), *EqualTo(id,2), EqualTo(id,1)], ReadSchema: struct<id:string,name:string>
+- *SortMergeJoin [people_id#10], [id#0], Inner
   :- *Sort [people_id#10 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(people_id#10, 200)
   :     +- *Filter isnotnull(people_id#10)
   :        +- *Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@3077e4aa [people_id#10,name#11] PushedFilters: [IsNotNull(people_id), *EqualTo(people_id,2)], ReadSchema: struct<people_id:string,name:string>
   +- *Sort [id#0 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(id#0, 200)
         +- Union
            :- *Filter (isnotnull(id#0) && (id#0 = 2))
            :  +- *Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@6846e4e8 [id#0,name#1] PushedFilters: [IsNotNull(id), *EqualTo(id,1), EqualTo(id,2)], ReadSchema: struct<id:string,name:string>
            +- *Filter isnotnull(id#0)
               +- *Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@6846e4e8 [id#0,name#1] PushedFilters: [IsNotNull(id), *EqualTo(id,2)], ReadSchema: struct<id:string,name:string>

现在对我来说很清楚我所做的是这个joined2版本,在每个联合的循环中都被称为join。我认为 Spark 足够聪明,可以将其减少到第一个版本......

现在当前的图表看起来好多了 在此处输入图像描述

我希望其他人不会犯我犯的同样错误 :) 不幸的是,我用我的抽象级别覆盖了 spark,它涵盖了那个简单的问题,所以 spark-shell 对建模问题有很大帮助。

标签: apache-sparkapache-spark-sqlspark-dataframe

解决方案


推荐阅读