首页 > 解决方案 > Spark 上下文在等待后端时停止

问题描述

我有一个长时间运行的 EMR 步骤,它在 EMR 客户端模式下执行 spark-submit。在作业执行之间,如果有任何配置发生更改,我会在下一次执行之前手动重新启动 Spark 上下文,例如--executor-memory.

当我尝试使用新配置重新启动上下文时遇到以下异常

currentSparkSession.close();
return SparkSession.builder().config(newConfig).getOrCreate();
19/05/23 15:52:35 ERROR SparkContext: Error initializing SparkContext.
java.lang.IllegalStateException: Spark context stopped while waiting for backend
    at org.apache.spark.scheduler.TaskSchedulerImpl.waitBackendReady(TaskSchedulerImpl.scala:689)
    at org.apache.spark.scheduler.TaskSchedulerImpl.postStartHook(TaskSchedulerImpl.scala:186)
    at org.apache.spark.SparkContext.<init>(SparkContext.scala:567)
    at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2516)
    at org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:923)
    at org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:915)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:915)
.
.
.
19/05/23 15:52:35 INFO SparkContext: SparkContext already stopped.
19/05/23 15:52:35 WARN TransportChannelHandler: Exception in connection from /172.31.0.165:42556
java.io.IOException: Connection reset by peer
    at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
    at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
    at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
    at sun.nio.ch.IOUtil.read(IOUtil.java:192)
    at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
    at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:221)
    at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:899)
    at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:275)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:643)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:566)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:480)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442)
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
    at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
    at java.lang.Thread.run(Thread.java:748)

我尝试让线程休眠一点,以防停止和启动之间需要一些时间,例如:

currentSparkSession.close();
Thread.sleep(5000); // Sleep 5 seconds
return SparkSession.builder().config(newConfig).getOrCreate();

但这也不起作用。我查看了 Spark 源代码,它看起来currentSparkSession.close()在它实际上停止之前不会返回,所以让线程睡眠不会做任何事情。

我还在容器日志中看到了这一点:

Error occurred during initialization of VM
Initial heap size set to a larger value than the maximum heap size
End of LogType:stdout

这让我感到困惑,因为我在执行之间更改的唯一配置是--executor-memory,我实际上减少了它而不是增加它。

我在这个网站上发现了类似的问题,比如Apache Spark running spark-shell on YARN error,但这些建议看起来基本上只是关闭了一些对我来说看起来不太安全的资源管理器验证检查。有什么建议么?

标签: apache-spark

解决方案


这是因为我尝试发送的请求--executor-memory(恰好设置 Xmx,最大堆大小)低于在初始 spark 提交时配置的 Xms(初始堆大小)。由于最大堆大小永远不会小于初始堆大小,因此引发了异常。


推荐阅读