首页 > 解决方案 > 任务失败后如何让 PySpark/Databricks 作业继续运行并忽略不良记录

问题描述

我正在使用 Databricks 和 PySpark。rdd包含文件路径列表,然后我处理每个文件并生成一些输出。

rdd.map(func).collect(), wherefunc将处理每个文件以生成一些输出。但是,func使用一些 C 库和坏文件会导致segmentation fault任务进程。我无法捕捉到错误。

目前,作业失败,因为任务在连接重置后重试 4 次。我不希望 Spark 停止,而是继续处理剩余的记录,忽略失败的记录,如果失败则重新启动新任务。忽略那些损坏的文件很好。如何在 Spark/Databricks 中设置?

这是带有超时的多处理:如果由于 超时segmentation fault,只需终止该进程并继续下一个进程,而不会使主进程崩溃。它在没有 Spark 的情况下运行良好,但在 Spark 中由于 pickle 错误而失败,并且 Databricks 似乎不支持多处理。

    with Pool(min(process_no, cpu_count())) as pool:
        pools = []
        for input in inputs:
            res = pool.apply_async(run_single, (input,))
            pools.append(res)
        for pool in pools:
            try:
                pool.get(timeout=20)
            except Exception as e:
                print("We lacked patience and got a multiprocessing.TimeoutError")
                continue

更新:

这是代码。我有 4 台 16 核的机器(正在运行 64 个并行任务)。

def func(input):
    import signal
    def call_the_actual_logic(input):
        .....
        return [0]

    def timeoutHandler(signum, frame):
        raise TimeoutError("Task takes too long")

    try:
        # install a timeout handler inside of the Spark task
        # following https://docs.python.org/3/library/signal.html#example
        signal.signal(signal.SIGALRM, timeoutHandler)
        signal.alarm(10)  # cancel the task after 5 seconds

        # call the actual logic
        result = call_the_actual_logic(input)
        signal.alarm(0)

        return result
    except TimeoutError:
        print("Logic for input {} took too long. Cleaning up...".format(input))
        return []
    except Exception as e:
        print("Logic for input {} threw an exception. Cleaning up...".format(input))
        return []
    
a = rdd.repartition(2048).flatMap(func).collect()

这是 Spark 抛出的异常。我怀疑一些工作人员因为一些损坏的文件和未捕获的错误而崩溃,因为我看到了一些处理错误。从下面的错误来看,这似乎是一个不同的例外。如果它崩溃了,是否可以忽略这个损坏的文件并重新启动工作人员?

org.apache.spark.SparkException: Job aborted due to stage failure: Task 162 in stage 49.0 failed 4 times, most recent failure: Lost task 162.3 in stage 49.0 (TID 20313, 10.139.64.4, executor 4): org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<command-3638575280205630> in <module>
     87 
     88 
---> 89 a = rdd.repartition(2048).flatMap(main_run_spark).collect()

/databricks/spark/python/pyspark/rdd.py in collect(self)
    901         # Default path used in OSS Spark / for non-credential passthrough clusters:
    902         with SCCallSiteSync(self.context) as css:
--> 903             sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
    904         return list(_load_from_socket(sock_info, self._jrdd_deserializer))
    905 

/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1303         answer = self.gateway_client.send_command(command)
   1304         return_value = get_return_value(
-> 1305             answer, self.gateway_client, self.target_id, self.name)
   1306 
   1307         for temp_arg in temp_args:

/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
    125     def deco(*a, **kw):
    126         try:
--> 127             return f(*a, **kw)
    128         except py4j.protocol.Py4JJavaError as e:
    129             converted = convert_exception(e.java_exception)

/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 162 in stage 49.0 failed 4 times, most recent failure: Lost task 162.3 in stage 49.0 (TID 20313, 10.139.64.4, executor 4): org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:636)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:625)
    at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
    at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:743)
    at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:721)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:556)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator.foreach(Iterator.scala:941)
    at scala.collection.Iterator.foreach$(Iterator.scala:941)
    at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
    at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
    at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
    at scala.collection.TraversableOnce.to(TraversableOnce.scala:315)
    at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313)
    at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
    at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307)
    at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307)
    at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
    at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294)
    at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288)
    at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
    at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1011)
    at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2373)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.doRunTask(Task.scala:144)
    at org.apache.spark.scheduler.Task.run(Task.scala:117)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$8(Executor.scala:677)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1581)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:680)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.EOFException
    at java.io.DataInputStream.readInt(DataInputStream.java:392)
    at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:728)
    ... 30 more

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2519)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2466)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2460)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2460)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1152)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1152)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1152)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2721)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2668)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2656)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:938)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2333)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2354)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2373)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2398)
    at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1011)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:165)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:125)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:395)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:1010)
    at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:260)
    at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
    at sun.reflect.GeneratedMethodAccessor559.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
    at py4j.Gateway.invoke(Gateway.java:295)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:251)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:636)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:625)
    at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
    at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:743)
    at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:721)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:556)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator.foreach(Iterator.scala:941)
    at scala.collection.Iterator.foreach$(Iterator.scala:941)
    at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
    at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
    at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
    at scala.collection.TraversableOnce.to(TraversableOnce.scala:315)
    at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313)
    at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
    at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307)
    at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307)
    at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
    at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294)
    at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288)
    at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
    at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1011)
    at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2373)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.doRunTask(Task.scala:144)
    at org.apache.spark.scheduler.Task.run(Task.scala:117)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$8(Executor.scala:677)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1581)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:680)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    ... 1 more
Caused by: java.io.EOFException
    at java.io.DataInputStream.readInt(DataInputStream.java:392)
    at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:728)
    ... 30 more

标签: apache-sparkpysparkdatabricks

解决方案


您可以在 内部实现错误和/或超时处理func,以便为每个 Spark 任务独立进行异常处理。

要调用该函数,请使用flatMap而不是 map 以便func可以返回一个或零个结果:

def func(input):
    import signal
    def timeoutHandler(signum, frame):
        raise TimeoutError("Task takes too long")
    try:
         #install a timeout handler inside of the Spark task
         #following https://docs.python.org/3/library/signal.html#example
        signal.signal(signal.SIGALRM, timeoutHandler)
        signal.alarm(5) #cancel the task after 5 seconds

        #call the actual logic
        result = call_the_actual_logic(input)
        signal.alarm(0) 

        #return an iterator with a single element
        return iter([result])

    except TimeoutError:
        #handle a timeout and return an empty iterator
        print("Logic for input {} took too long. Cleaning up...".format(input))
        return iter([])
    except Exception:
        #handle any other exception and return an empty iterator
        print("Logic for input {} threw an exception. Cleaning up...".format(input))
        return iter([])

def call_the_actual_logic(input):
    #call the C libraries and return the result
    return ...

#trigger the calculation
rdd2 = rdd.flatMap(func)

免责声明:我已经在 Linux 机器上的普通 Spark 安装上测试了这段代码,而不是在 Databricks 笔记本中


推荐阅读