apache-spark - 任务失败后如何让 PySpark/Databricks 作业继续运行并忽略不良记录
问题描述
我正在使用 Databricks 和 PySpark。rdd
包含文件路径列表,然后我处理每个文件并生成一些输出。
rdd.map(func).collect()
, wherefunc
将处理每个文件以生成一些输出。但是,func
使用一些 C 库和坏文件会导致segmentation fault
任务进程。我无法捕捉到错误。
目前,作业失败,因为任务在连接重置后重试 4 次。我不希望 Spark 停止,而是继续处理剩余的记录,忽略失败的记录,如果失败则重新启动新任务。忽略那些损坏的文件很好。如何在 Spark/Databricks 中设置?
这是带有超时的多处理:如果由于 超时segmentation fault
,只需终止该进程并继续下一个进程,而不会使主进程崩溃。它在没有 Spark 的情况下运行良好,但在 Spark 中由于 pickle 错误而失败,并且 Databricks 似乎不支持多处理。
with Pool(min(process_no, cpu_count())) as pool:
pools = []
for input in inputs:
res = pool.apply_async(run_single, (input,))
pools.append(res)
for pool in pools:
try:
pool.get(timeout=20)
except Exception as e:
print("We lacked patience and got a multiprocessing.TimeoutError")
continue
更新:
这是代码。我有 4 台 16 核的机器(正在运行 64 个并行任务)。
def func(input):
import signal
def call_the_actual_logic(input):
.....
return [0]
def timeoutHandler(signum, frame):
raise TimeoutError("Task takes too long")
try:
# install a timeout handler inside of the Spark task
# following https://docs.python.org/3/library/signal.html#example
signal.signal(signal.SIGALRM, timeoutHandler)
signal.alarm(10) # cancel the task after 5 seconds
# call the actual logic
result = call_the_actual_logic(input)
signal.alarm(0)
return result
except TimeoutError:
print("Logic for input {} took too long. Cleaning up...".format(input))
return []
except Exception as e:
print("Logic for input {} threw an exception. Cleaning up...".format(input))
return []
a = rdd.repartition(2048).flatMap(func).collect()
这是 Spark 抛出的异常。我怀疑一些工作人员因为一些损坏的文件和未捕获的错误而崩溃,因为我看到了一些处理错误。从下面的错误来看,这似乎是一个不同的例外。如果它崩溃了,是否可以忽略这个损坏的文件并重新启动工作人员?
org.apache.spark.SparkException: Job aborted due to stage failure: Task 162 in stage 49.0 failed 4 times, most recent failure: Lost task 162.3 in stage 49.0 (TID 20313, 10.139.64.4, executor 4): org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<command-3638575280205630> in <module>
87
88
---> 89 a = rdd.repartition(2048).flatMap(main_run_spark).collect()
/databricks/spark/python/pyspark/rdd.py in collect(self)
901 # Default path used in OSS Spark / for non-credential passthrough clusters:
902 with SCCallSiteSync(self.context) as css:
--> 903 sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
904 return list(_load_from_socket(sock_info, self._jrdd_deserializer))
905
/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
1303 answer = self.gateway_client.send_command(command)
1304 return_value = get_return_value(
-> 1305 answer, self.gateway_client, self.target_id, self.name)
1306
1307 for temp_arg in temp_args:
/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
125 def deco(*a, **kw):
126 try:
--> 127 return f(*a, **kw)
128 except py4j.protocol.Py4JJavaError as e:
129 converted = convert_exception(e.java_exception)
/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
--> 328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 162 in stage 49.0 failed 4 times, most recent failure: Lost task 162.3 in stage 49.0 (TID 20313, 10.139.64.4, executor 4): org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:636)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:625)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:743)
at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:721)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:556)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator.foreach(Iterator.scala:941)
at scala.collection.Iterator.foreach$(Iterator.scala:941)
at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
at scala.collection.TraversableOnce.to(TraversableOnce.scala:315)
at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313)
at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307)
at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307)
at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294)
at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288)
at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1011)
at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2373)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:144)
at org.apache.spark.scheduler.Task.run(Task.scala:117)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$8(Executor.scala:677)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1581)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:680)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.EOFException
at java.io.DataInputStream.readInt(DataInputStream.java:392)
at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:728)
... 30 more
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2519)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2466)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2460)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2460)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1152)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1152)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1152)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2721)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2668)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2656)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:938)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2333)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2354)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2373)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2398)
at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1011)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:165)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:125)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:395)
at org.apache.spark.rdd.RDD.collect(RDD.scala:1010)
at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:260)
at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
at sun.reflect.GeneratedMethodAccessor559.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
at py4j.Gateway.invoke(Gateway.java:295)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:251)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:636)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:625)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:743)
at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:721)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:556)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator.foreach(Iterator.scala:941)
at scala.collection.Iterator.foreach$(Iterator.scala:941)
at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
at scala.collection.TraversableOnce.to(TraversableOnce.scala:315)
at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313)
at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307)
at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307)
at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294)
at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288)
at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1011)
at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2373)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:144)
at org.apache.spark.scheduler.Task.run(Task.scala:117)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$8(Executor.scala:677)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1581)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:680)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
Caused by: java.io.EOFException
at java.io.DataInputStream.readInt(DataInputStream.java:392)
at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:728)
... 30 more
解决方案
您可以在 内部实现错误和/或超时处理func
,以便为每个 Spark 任务独立进行异常处理。
要调用该函数,请使用flatMap而不是 map 以便func
可以返回一个或零个结果:
def func(input):
import signal
def timeoutHandler(signum, frame):
raise TimeoutError("Task takes too long")
try:
#install a timeout handler inside of the Spark task
#following https://docs.python.org/3/library/signal.html#example
signal.signal(signal.SIGALRM, timeoutHandler)
signal.alarm(5) #cancel the task after 5 seconds
#call the actual logic
result = call_the_actual_logic(input)
signal.alarm(0)
#return an iterator with a single element
return iter([result])
except TimeoutError:
#handle a timeout and return an empty iterator
print("Logic for input {} took too long. Cleaning up...".format(input))
return iter([])
except Exception:
#handle any other exception and return an empty iterator
print("Logic for input {} threw an exception. Cleaning up...".format(input))
return iter([])
def call_the_actual_logic(input):
#call the C libraries and return the result
return ...
#trigger the calculation
rdd2 = rdd.flatMap(func)
免责声明:我已经在 Linux 机器上的普通 Spark 安装上测试了这段代码,而不是在 Databricks 笔记本中
推荐阅读
- javascript - 从 PHP /w 两个 ajax 调用获取状态
- javascript - 在使用 redux 做出反应时,业务逻辑应该去哪里(动作创建者或减速器)?
- python - 'DNN' 对象在 ImageDataGenerator() 中没有属性'fit_generator' - keras - python
- android - 在android中使用auth Token将数据发布到Rest API服务器时出错
- amazon-web-services - aws-sdk-js 事务操作未暴露给 DocumentClient?
- python - 数组的尺寸不匹配
- tensorflow - 如何将具有相同架构但权重不同的两个 keras 模型加载到一个图中?
- ios - ReadMoreTextview 不适用于新的文本行
- angular - 向 Angular 表达式中的三元运算符添加徽标
- f# - 一个可以使用 f# 检查给定列表是否为字符串列表的函数