apache-spark - Spark 结果大小太大 - 用例
问题描述
我的输入数据存储在 Cassandra 中,我使用主键为年、月、日、小时的表作为 Spark 聚合的来源。我的 Spark 应用程序可以
- 连接两个表
- 获取连接表并按小时选择数据
- 按小时联合选择的块
- 对结果数据集进行聚合并保存到 Cassandra
简化
val ds1 = spark.read.cassandraFormat(table1, keyspace).load().as[T]
val ds2 = spark.read.cassandraFormat(table2, keyspace).load().as[T]
val dsInput = ds1.join(ds2).coalesce(150)
val dsUnion = for (x <- hours) yield dsInput.select( where hour = x)
val dsResult = mySparkAggregation( dsUnion.reduce(_.union(_)).coalesce(10) )
dsResult.saveToCassadnra
`
结果图如下所示(3 小时/工会)
当我只做几个工会时一切正常,例如 24 个(一天),但是当我开始运行 Spark 作业 1 个月(720 个工会)时,我开始收到这样的错误
1126 个任务的序列化结果总大小 (1024.8 MB) 大于 spark.driver.maxResultSize (1024.0 MB)
另一个令人担忧的事情是,该作业创建了大约 10 万个任务,其中一个阶段(导致上述错误的那个阶段)包含 74400 个任务,当它处理 1125 个任务时,由于 maxResultSize 而崩溃。更重要的是,它似乎必须每小时对数据进行洗牌(联合)。
我试图在联合后合并任务的数量——而不是说任务太大。
我将非常感谢任何帮助,建议?我感觉我做错了什么。
我做了一些调查并得出了一些结论假设我们有两张桌子
cb.people CREATE TABLE cb.people ( id text PRIMARY KEY, name text )
和 cb.address CREATE TABLE cb.address (people_id text PRIMARY KEY, name text)
有以下数据
cassandra@cqlsh> select * from cb.people;
id | name
----+---------
3 | Mariusz
2 | Monica
1 | John
cassandra@cqlsh> select * from cb.address;
people_id | name
-----------+--------
3 | POLAND
2 | USA
1 | USA
现在我想获得 id 1 和 2 的连接结果。有两种可能的解决方案。
联合二从表people中选择id 1和2,然后与地址表join
scala> val people = spark.read.cassandraFormat("people", "cb").load() scala> val usPeople = people.where(col("id") === "1") union people.where(col("id") === "2") scala> val address = spark.read.cassandraFormat("address", "cb").load() scala> val joined = usPeople.join(address, address.col("people_id") === usPeople.col("id"))
加入两个表,然后为 id 1 和 2 联合两个选择
scala> val peopleAddress = address.join(usPeople, address.col("people_id") === usPeople.col("id")) scala> val joined2 = peopleAddress.where(col("id") === "1") union peopleAddress.where(col("id") === "2")
两者都返回相同的结果
+---------+----+---+------+
|people_id|name| id| name|
+---------+----+---+------+
| 1| USA| 1| John|
| 2| USA| 2|Monica|
+---------+----+---+------+
但是看解释我可以看到很大的不同
scala> joined.explain
== Physical Plan ==
*SortMergeJoin [people_id#10], [id#0], Inner
:- *Sort [people_id#10 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(people_id#10, 200)
: +- *Filter (((people_id#10 = 1) || (people_id#10 = 2)) && isnotnull(people_id#10))
: +- *Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@3077e4aa [people_id#10,name#11] PushedFilters: [Or(EqualTo(people_id,1),EqualTo(people_id,2)), IsNotNull(people_id)], ReadSchema: struct<people_id:string,name:string>
+- *Sort [id#0 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(id#0, 200)
+- Union
:- *Filter isnotnull(id#0)
: +- *Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@6846e4e8 [id#0,name#1] PushedFilters: [IsNotNull(id), *EqualTo(id,1)], ReadSchema: struct<id:string,name:string>
+- *Filter isnotnull(id#0)
+- *Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@6846e4e8 [id#0,name#1] PushedFilters: [IsNotNull(id), *EqualTo(id,2)], ReadSchema: struct<id:string,name:string>
scala> joined2.explain
== Physical Plan ==
Union
:- *SortMergeJoin [people_id#10], [id#0], Inner
: :- *Sort [people_id#10 ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(people_id#10, 200)
: : +- *Filter isnotnull(people_id#10)
: : +- *Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@3077e4aa [people_id#10,name#11] PushedFilters: [*EqualTo(people_id,1), IsNotNull(people_id)], ReadSchema: struct<people_id:string,name:string>
: +- *Sort [id#0 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(id#0, 200)
: +- Union
: :- *Filter isnotnull(id#0)
: : +- *Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@6846e4e8 [id#0,name#1] PushedFilters: [IsNotNull(id), *EqualTo(id,1)], ReadSchema: struct<id:string,name:string>
: +- *Filter (isnotnull(id#0) && (id#0 = 1))
: +- *Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@6846e4e8 [id#0,name#1] PushedFilters: [IsNotNull(id), *EqualTo(id,2), EqualTo(id,1)], ReadSchema: struct<id:string,name:string>
+- *SortMergeJoin [people_id#10], [id#0], Inner
:- *Sort [people_id#10 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(people_id#10, 200)
: +- *Filter isnotnull(people_id#10)
: +- *Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@3077e4aa [people_id#10,name#11] PushedFilters: [IsNotNull(people_id), *EqualTo(people_id,2)], ReadSchema: struct<people_id:string,name:string>
+- *Sort [id#0 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(id#0, 200)
+- Union
:- *Filter (isnotnull(id#0) && (id#0 = 2))
: +- *Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@6846e4e8 [id#0,name#1] PushedFilters: [IsNotNull(id), *EqualTo(id,1), EqualTo(id,2)], ReadSchema: struct<id:string,name:string>
+- *Filter isnotnull(id#0)
+- *Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@6846e4e8 [id#0,name#1] PushedFilters: [IsNotNull(id), *EqualTo(id,2)], ReadSchema: struct<id:string,name:string>
现在对我来说很清楚我所做的是这个joined2版本,在每个联合的循环中都被称为join。我认为 Spark 足够聪明,可以将其减少到第一个版本......
我希望其他人不会犯我犯的同样错误 :) 不幸的是,我用我的抽象级别覆盖了 spark,它涵盖了那个简单的问题,所以 spark-shell 对建模问题有很大帮助。
解决方案
推荐阅读
- reactjs - 用 setTimeout 测试钩子状态更新
- wordpress - Nginx:(111:连接被拒绝)同时连接到上游 wordpress 和 docker
- google-cloud-platform - 谷歌云构建 PyPi 400 错误
- c - 你能检查一下我在 C 中实现递归合并排序函数时出错的地方吗?
- javascript - 无法使用 JavaScript 的 ReadableStream 将 StreamingResponseBody 流转换为 JSON
- javascript - 为什么这段 Javascript 代码返回一个空数组?
- python - 将数据从 hadoop 导出到 Oracle 处理删除和停机时间最短的有效方法是什么
- vb.net - 根据管理员、医生和护士这 3 种用户类型重定向表单
- python-3.x - 在 Numpy 和手动计算之间得到不同的答案
- python - 为什么我没有收到错误但代码不起作用?