python - 在一个 Spark 作业中运行两个操作时“超出 GC 开销限制”;单独运行没问题
问题描述
我有以下 Spark SQL 代码来检查大型表(数十亿行)中是否缺少某些日期:
spark = SparkSession.builder \
.master("yarn") \
.appName("minimal_example") \
.config('spark.submit.deployMode', 'client') \
.getOrCreate()
SQL = '''
select distinct
substr(entrydate, 1, 10) as datum,
1 as in_table
from {table}
where entrydate >= '{datum}'
'''
print("RUN1")
df1 = spark.sql(SQL.format(datum='2017-01-01', table='table1'))
c1 = df1.count()
print("count1: ", c1)
print("RUN2")
df2 = spark.sql(SQL.format(datum='2017-01-01', table='table2'))
c2 = df2.count()
print("count2: ", c2)
本质上,该函数只是从表列中获取不同的日期。
现在我无法理解的部分:
- 每次调用都
count()
运行良好 - 当我将每个呼叫作为单独的
spark-submit
作业运行时,它工作正常 - 但是如果像上面那样连续运行它们,第二次运行会产生以下错误:
py4j.protocol.Py4JJavaError: An error occurred while calling o150.sql.
: java.util.concurrent.ExecutionException: java.io.IOException: com.google.protobuf.ServiceException: java.lang.OutOfMemoryError: GC overhead limit exceeded
我的解释是,第一次运行的垃圾收集在第二次运行期间开始。
我试过的:
- 在每次迭代开始时调用 spark.clearCache()
- 在每次迭代开始
spark._jvm.SparkSession.clearDefaultSession()
时调用spark._jvm.SparkSession.clearActiveSession()
- 查看 Spark Web UI 并尝试从 DAG 和 Storage 选项卡(后者不显示任何内容)中获得意义,但无济于事
- 改变两个
count
s的顺序。这会导致不同的错误:(有关类似错误,java.io.IOException: Connection reset by peer
请参见此处)
最后一个观察:第一次调用启动 >100 个 Spark/YARN 执行器,也许 Spark 的动态分配机制不喜欢第二次调用实际上是一个对执行器有不同要求的新工作?
任何帮助深表感谢!
环境:Cloudera CDH 6.1 集群上的 Spark 2.3。
编辑:更多细节
- 这些表作为 Parquet 文件保存在 HDFS 中,统计信息:
+--------+------------+-------+--------+--------------+
| table | # rows |# cols |# files | raw size |
+--------+------------+-------+--------+--------------+
| table1 | 5660970439 | 46 | 49167 | 228876171398 |
| table2 | 5656000217 | 52 | 80000 | 518996700170 |
+--------+------------+-------+--------+--------------+
- 内存设置:Spark on YARN 动态分配,最小执行器内存为 1GB,最大为 72GB,集群总内存约为 300GB。
- 第一个
count()
启动了大约 150 个执行器,充分利用了当前可用的内存资源
解决方案
在让问题陷入几天之后,我只是尝试增加驱动程序内存:
spark2-submit --master yarn --deploy-mode client --driver-memory 4G minimal_example.py
也许决定因素是我的应用程序是以client
模式启动的。显然,管理大量执行程序(及其删除)会消耗大量内存,即使驱动程序本身只接收简单的df.count()
.
推荐阅读
- python-3.x - 如何将详细文本映射到一元或二元
- c - Visual Studio C Scanf 扫描两次而不是一次,只读取一次
- windows - Windows 的 Git GUI 窗口丢失和/或不可见
- python - 2013, '在查询期间丢失与 MySQL 服务器的连接 - Pymysql Microsoft sql server management Studio
- vba - 在 VBA 中实现多个接口的接口
- node.js - 如何重新连接到 websocket
- swift - SpriteKit presentScene 不适用于过渡,只有没有
- linux - 如何在 bash 脚本根模式下获取用户的 $HOME 目录?
- audio - 视频是 25 fps,音频是 50 fps?
- reactjs - React-Router V6 检查路径是否匹配模式