首页 > 解决方案 > python作业上的Spark提交过程泄漏

问题描述

我遇到了一个奇怪的问题,即 spark 提交进程无限期挂起并在作业完成后泄漏内存。

我一直有恰好 3 个 spark 提交进程挂在使用客户端模式提交到集群的前 3 个作业中。来自客户端的示例:

root 1517 0.3 4.7 8412728 1532876 ? Sl 18:49 0:38 /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java -cp /usr/local/spark/conf/:/usr/local/spark/jars/*:/usr/local/hadoop-2.7.7/etc/hadoop/:/usr/local/hadoop-2.7.7/share/hadoop/common/lib/*:/usr/local/hadoop-2.7.7/share/hadoop/common/*:/usr/local/hadoop-2.7.7/share/hadoop/hdfs/:/usr/local/hadoop-2.7.7/share/hadoop/hdfs/lib/*:/usr/local/hadoop-2.7.7/share/hadoop/hdfs/*:/usr/local/hadoop-2.7.7/share/hadoop/yarn/lib/*:/usr/local/hadoop-2.7.7/share/hadoop/yarn/*:/usr/local/hadoop-2.7.7/share/hadoop/mapreduce/lib/*:/usr/local/hadoop-2.7.7/share/hadoop/mapreduce/*:/usr/local/hadoop/contrib/capacity-scheduler/*.jar -Xmx2g org.apache.spark.deploy.SparkSubmit --conf spark.driver.port=46101 --conf spark.master=spark://3c520b0c6d6e:7077 --conf spark.scheduler.allocation.file=/home/jovyan/work/spark_scheduler_allocation.xml --conf spark.app.name=REDACTED --conf spark.driver.bindAddress=3c520b0c6d6e --conf spark.fileserver.port=46102 --conf packages=org.apache.kudu:kudu-spark2_2.11:1.12.0 --conf spark.broadcast.port=46103 --conf spark.driver.host=3c520b0c6d6e --conf spark.replClassServer.port=46104 --conf spark.executorEnv.AF_ALERTS_STREAM_KEY=ALERTS_STREAM_LIST --conf spark.scheduler.mode=FAIR --conf spark.shuffle.service.enabled=True --conf spark.blockManager.port=46105 --conf spark.dynamicAllocation.enabled=true pyspark-shell

root 1746 0.4 3.5 8152640 1132420 ? Sl 18:59 0:36 /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java -cp /usr/local/spark/conf/:/usr/local/spark/jars/*:/usr/local/hadoop-2.7.7/etc/hadoop/:/usr/local/hadoop-2.7.7/share/hadoop/common/lib/*:/usr/local/hadoop-2.7.7/share/hadoop/common/*:/usr/local/hadoop-2.7.7/share/hadoop/hdfs/:/usr/local/hadoop-2.7.7/share/hadoop/hdfs/lib/*:/usr/local/hadoop-2.7.7/share/hadoop/hdfs/*:/usr/local/hadoop-2.7.7/share/hadoop/yarn/lib/*:/usr/local/hadoop-2.7.7/share/hadoop/yarn/*:/usr/local/hadoop-2.7.7/share/hadoop/mapreduce/lib/*:/usr/local/hadoop-2.7.7/share/hadoop/mapreduce/*:/usr/local/hadoop/contrib/capacity-scheduler/*.jar -Xmx2g org.apache.spark.deploy.SparkSubmit --conf spark.driver.port=46101 --conf spark.master=spark://3c520b0c6d6e:7077 --conf spark.scheduler.allocation.file=/home/jovyan/work/spark_scheduler_allocation.xml --conf spark.app.name=REDACTED --conf spark.driver.bindAddress=3c520b0c6d6e --conf spark.fileserver.port=46102 --conf packages=org.apache.kudu:kudu-spark2_2.11:1.12.0 --conf spark.broadcast.port=46103 --conf spark.driver.host=3c520b0c6d6e --conf spark.replClassServer.port=46104 --conf spark.executorEnv.AF_ALERTS_STREAM_KEY=ALERTS_STREAM_LIST --conf spark.scheduler.mode=FAIR --conf spark.shuffle.service.enabled=True --conf spark.blockManager.port=46105 --conf spark.dynamicAllocation.enabled=true pyspark-shell

root 2239 65.3 7.8 9743456 2527236 ? Sl 19:10 91:30 /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java -cp /usr/local/spark/conf/:/usr/local/spark/jars/*:/usr/local/hadoop-2.7.7/etc/hadoop/:/usr/local/hadoop-2.7.7/share/hadoop/common/lib/*:/usr/local/hadoop-2.7.7/share/hadoop/common/*:/usr/local/hadoop-2.7.7/share/hadoop/hdfs/:/usr/local/hadoop-2.7.7/share/hadoop/hdfs/lib/*:/usr/local/hadoop-2.7.7/share/hadoop/hdfs/*:/usr/local/hadoop-2.7.7/share/hadoop/yarn/lib/*:/usr/local/hadoop-2.7.7/share/hadoop/yarn/*:/usr/local/hadoop-2.7.7/share/hadoop/mapreduce/lib/*:/usr/local/hadoop-2.7.7/share/hadoop/mapreduce/*:/usr/local/hadoop/contrib/capacity-scheduler/*.jar -Xmx2g org.apache.spark.deploy.SparkSubmit --conf spark.driver.port=46101 --conf spark.master=spark://3c520b0c6d6e:7077 --conf spark.scheduler.allocation.file=/home/jovyan/work/spark_scheduler_allocation.xml --conf spark.app.name=REDACTED --conf spark.driver.bindAddress=3c520b0c6d6e --conf spark.fileserver.port=46102 --conf packages=org.apache.kudu:kudu-spark2_2.11:1.12.0 --conf spark.broadcast.port=46103 --conf spark.driver.host=3c520b0c6d6e --conf spark.replClassServer.port=46104 --conf spark.executorEnv.AF_ALERTS_STREAM_KEY=ALERTS_STREAM_LIST --conf spark.scheduler.mode=FAIR --conf spark.shuffle.service.enabled=True --conf spark.blockManager.port=46105 --conf spark.dynamicAllocation.enabled=true pyspark-shell

相应的作业在 spark UI 中显示为“已完成”,并已关闭其会话并根据其日志退出。这些作业不再消耗任何工作人员资源,后续作业能够接收最大执行程序并按预期运行。但是,这 3 个进程一直在以缓慢增长的速度消耗内存,最终在尝试分配新驱动程序时导致 OOM。

上面进程 1517 的线程列表显示了以下用户线程(省略了守护线程):

"Thread-4" #16 prio=5 os_prio=0 tid=0x00007f8fe4008000 nid=0x61c runnable [0x00007f9029227000]

java.lang.Thread.State: RUNNABLE

`at java.net.SocketInputStream.socketRead0(Native Method)`

`at java.net.SocketInputStream.socketRead(`[`SocketInputStream.java:116`](https://SocketInputStream.java:116)`)`

`at` [`java.net.SocketInputStream.read`](https://java.net.SocketInputStream.read)`(`[`SocketInputStream.java:171`](https://SocketInputStream.java:171)`)`

`at` [`java.net.SocketInputStream.read`](https://java.net.SocketInputStream.read)`(`[`SocketInputStream.java:141`](https://SocketInputStream.java:141)`)`

`at sun.nio.cs.StreamDecoder.readBytes(`[`StreamDecoder.java:284`](https://StreamDecoder.java:284)`)`

`at sun.nio.cs.StreamDecoder.implRead(`[`StreamDecoder.java:326`](https://StreamDecoder.java:326)`)`

`at` [`sun.nio.cs.StreamDecoder.read`](https://sun.nio.cs.StreamDecoder.read)`(`[`StreamDecoder.java:178`](https://StreamDecoder.java:178)`)`

`- locked <0x00000000800f8a88> (a java.io.InputStreamReader)`

`at` [`java.io.InputStreamReader.read`](https://java.io.InputStreamReader.read)`(`[`InputStreamReader.java:184`](https://InputStreamReader.java:184)`)`

`at java.io.BufferedReader.fill(`[`BufferedReader.java:161`](https://BufferedReader.java:161)`)`

`at java.io.BufferedReader.readLine(`[`BufferedReader.java:324`](https://BufferedReader.java:324)`)`

`- locked <0x00000000800f8a88> (a java.io.InputStreamReader)`

`at java.io.BufferedReader.readLine(`[`BufferedReader.java:389`](https://BufferedReader.java:389)`)`

`at` [`py4j.GatewayConnection.run`](https://py4j.GatewayConnection.run)`(`[`GatewayConnection.java:230`](https://GatewayConnection.java:230)`)`

`at` [`java.lang.Thread.run`](https://java.lang.Thread.run)`(`[`Thread.java:748`](https://Thread.java:748)`)`
Locked ownable synchronizers:

`- None`
"Thread-3" #15 prio=5 os_prio=0 tid=0x00007f905dab7000 nid=0x61b runnable [0x00007f9029328000]

java.lang.Thread.State: RUNNABLE

`at java.net.PlainSocketImpl.socketAccept(Native Method)`

`at java.net.AbstractPlainSocketImpl.accept(`[`AbstractPlainSocketImpl.java:409`](https://AbstractPlainSocketImpl.java:409)`)`

`at java.net.ServerSocket.implAccept(`[`ServerSocket.java:560`](https://ServerSocket.java:560)`)`

`at java.net.ServerSocket.accept(`[`ServerSocket.java:528`](https://ServerSocket.java:528)`)`

`at` [`py4j.GatewayServer.run`](https://py4j.GatewayServer.run)`(`[`GatewayServer.java:685`](https://GatewayServer.java:685)`)`

`at` [`java.lang.Thread.run`](https://java.lang.Thread.run)`(`[`Thread.java:748`](https://Thread.java:748)`)`
Locked ownable synchronizers:

`- None`
"pool-1-thread-1" #14 prio=5 os_prio=0 tid=0x00007f905daa5000 nid=0x61a waiting on condition [0x00007f902982c000]

java.lang.Thread.State: TIMED_WAITING (parking)

`at sun.misc.Unsafe.park(Native Method)`

`- parking to wait for  <0x000000008011cda8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)`

`at java.util.concurrent.locks.LockSupport.parkNanos(`[`LockSupport.java:215`](https://LockSupport.java:215)`)`

`at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(`[`AbstractQueuedSynchronizer.java:2078`](https://AbstractQueuedSynchronizer.java:2078)`)`

`at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(`[`ScheduledThreadPoolExecutor.java:1093`](https://ScheduledThreadPoolExecutor.java:1093)`)`

`at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(`[`ScheduledThreadPoolExecutor.java:809`](https://ScheduledThreadPoolExecutor.java:809)`)`

`at java.util.concurrent.ThreadPoolExecutor.getTask(`[`ThreadPoolExecutor.java:1074`](https://ThreadPoolExecutor.java:1074)`)`

`at java.util.concurrent.ThreadPoolExecutor.runWorker(`[`ThreadPoolExecutor.java:1134`](https://ThreadPoolExecutor.java:1134)`)`

`at` [`java.util.concurrent.ThreadPoolExecutor$Worker.run`](https://java.util.concurrent.ThreadPoolExecutor$Worker.run)`(`[`ThreadPoolExecutor.java:624`](https://ThreadPoolExecutor.java:624)`)`

`at` [`java.lang.Thread.run`](https://java.lang.Thread.run)`(`[`Thread.java:748`](https://Thread.java:748)`)`
Locked ownable synchronizers:

`- None`
"main" #1 prio=5 os_prio=0 tid=0x00007f905c016800 nid=0x604 runnable [0x00007f9062b96000]

java.lang.Thread.State: RUNNABLE

`at java.io.FileInputStream.readBytes(Native Method)`

`at` [`java.io.FileInputStream.read`](https://java.io.FileInputStream.read)`(`[`FileInputStream.java:255`](https://FileInputStream.java:255)`)`

`at java.io.BufferedInputStream.fill(`[`BufferedInputStream.java:246`](https://BufferedInputStream.java:246)`)`

`at` [`java.io.BufferedInputStream.read`](https://java.io.BufferedInputStream.read)`(`[`BufferedInputStream.java:265`](https://BufferedInputStream.java:265)`)`

`- locked <0x0000000080189dc8> (a java.io.BufferedInputStream)`

`at org.apache.spark.api.python.PythonGatewayServer$.main(PythonGatewayServer.scala:87)`

`at org.apache.spark.api.python.PythonGatewayServer.main(PythonGatewayServer.scala)`

`at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)`

`at sun.reflect.NativeMethodAccessorImpl.invoke(`[`NativeMethodAccessorImpl.java:62`](https://NativeMethodAccessorImpl.java:62)`)`

`at sun.reflect.DelegatingMethodAccessorImpl.invoke(`[`DelegatingMethodAccessorImpl.java:43`](https://DelegatingMethodAccessorImpl.java:43)`)`

`at java.lang.reflect.Method.invoke(`[`Method.java:498`](https://Method.java:498)`)`

`at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)`

`at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:845)`

`at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)`

`at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)`

`at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)`

`at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:920)`

`at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:929)`

`at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)`
Locked ownable synchronizers:

`- None`
"VM Thread" os_prio=0 tid=0x00007f905c08c000 nid=0x60d runnable

"GC task thread#0 (ParallelGC)" os_prio=0 tid=0x00007f905c02b800 nid=0x605 runnable

"GC task thread#1 (ParallelGC)" os_prio=0 tid=0x00007f905c02d000 nid=0x606 runnable

"GC task thread#2 (ParallelGC)" os_prio=0 tid=0x00007f905c02f000 nid=0x607 runnable

"GC task thread#3 (ParallelGC)" os_prio=0 tid=0x00007f905c030800 nid=0x608 runnable

"GC task thread#4 (ParallelGC)" os_prio=0 tid=0x00007f905c032800 nid=0x609 runnable

"GC task thread#5 (ParallelGC)" os_prio=0 tid=0x00007f905c034000 nid=0x60a runnable

"GC task thread#6 (ParallelGC)" os_prio=0 tid=0x00007f905c036000 nid=0x60b runnable

"GC task thread#7 (ParallelGC)" os_prio=0 tid=0x00007f905c037800 nid=0x60c runnable

"VM Periodic Task Thread" os_prio=0 tid=0x00007f905c0e0800 nid=0x616 waiting on condition

我注意到主线程阻塞了来自 PythonGatewayServer 的文件输入流读取,其余线程似乎被阻塞等待套接字读取。似乎出于某种原因保留了任意数量的 python 网关。

关于原因的任何想法?

标签: apache-sparkpysparkmemory-leaksout-of-memoryspark-submit

解决方案


正如@mazaneicha 所注意到的,spark-submit 进程在 pyspark REPL shell 中运行。这是因为作业是从 python 运行时提交的,而不是使用正确的 spark-submit 脚本。

例如,考虑这个类:

class BatchTask:
    def start_job(self):
        try:
            spark = SparkSession.builder.appName('app_name').getOrCreate()
            dataframe = spark.read \
                   .format('some.format') \
                   .load()
            ... perform dataframe aggregations ...
            dataframe.collect()
        finally:
            if spark:
                spark.stop()

事实证明,如果您在安装了 pyspark 并在 PYTHONPATH 上的机器上 实例化BatchTask并调用任何 python 运行时,则 spark-submit 进程将在 pyspark REPL中运行并执行应用程序驱动程序。这个 REPL 不会退出 & 会被以这种方式提交的后续作业重用,累积每个作业添加的所有守护线程,最终耗尽驱动程序的内存限制并崩溃。start_job

解决方案 - 不要以这种方式提交 spark 应用程序,而是使用 spark-submit 脚本。


推荐阅读