首页 > 解决方案 > 在 Spark 工作人员上更改 PYSPARK_PYTHON

问题描述

我们分发使用 Spark 的 Python 应用程序和 Python 3.7 解释器(python.exe所有必要的库都在附近MyApp.exe)。

设置PYSPARK_PYTHON我们有一个函数来确定我们的路径python.exe

os.environ['PYSPARK_PYTHON'] = get_python()  

在 WindowsPYSPARK_PYTHON上将成为C:/MyApp/python.exe
在 UbuntuPYSPARK_PYTHON上将成为/opt/MyApp/python.exe

我们启动主/驱动节点并SparkSession在 Windows 上创建。然后我们在 Ubuntu 上启动工作节点,但工作节点失败:

Job aborted due to stage failure: Task 1 in stage 11.0 failed 4 times, most recent failure: Lost task 1.3 in stage 11.0 (TID 1614, 10.0.2.15, executor 1): java.io.IOException: Cannot run program "C:/MyApp/python.exe": error=2, No such file or directory

当然,C:/MyApp/python.exe在 ubuntu 上是没有的。

如果我正确理解此错误,PYSPARK_PYTHON则驱动程序将发送给所有工作人员。

还尝试设置PYSPARK_PYTHON和。我怎样才能改变Ubuntu 工作者成为?spark-env.shspark-defaults.confPYSPARK_PYTHON/opt/MyApp/python.exe

标签: pythonapache-sparkubuntupysparkenvironment-variables

解决方案


浏览源代码,看起来 Python 驱动程序代码在创建运行 Python 函数的工作项时将 Python 可执行路径的值从其 Spark 上下文中放入spark/rdd.py

def _wrap_function(sc, func, deserializer, serializer, profiler=None):
    assert deserializer, "deserializer should not be empty"
    assert serializer, "serializer should not be empty"
    command = (func, profiler, deserializer, serializer)
    pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
    return sc._jvm.PythonFunction(bytearray(pickled_command), env, includes, sc.pythonExec,
                                                                             ^^^^^^^^^^^^^
                                  sc.pythonVer, broadcast_vars, sc._javaAccumulator)

然后 PythonPythonRunner.scala运行程序使用存储在它接收到的第一个工作项中的路径来启动新的解释器实例:

private[spark] abstract class BasePythonRunner[IN, OUT](
    funcs: Seq[ChainedPythonFunctions],
    evalType: Int,
    argOffsets: Array[Array[Int]])
  extends Logging {
  ...
  protected val pythonExec: String = funcs.head.funcs.head.pythonExec
                                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  ...
  def compute(
      inputIterator: Iterator[IN],
      partitionIndex: Int,
      context: TaskContext): Iterator[OUT] = {
    ...
    val worker: Socket = env.createPythonWorker(pythonExec, envVars.asScala.toMap)
    ...
  }
  ...
}

基于此,恐怕目前似乎不可能在 master 和 worker 中对 Python 可执行文件进行单独的配置。另请参阅发布SPARK-26404的第三条评论。也许您应该向 Apache Spark 项目提交 RFE。

虽然我不是 Spark 专家,但可能仍然有办法做到这一点,也许通过设置PYSPARK_PYTHON为 just"python"然后确保系统PATH进行了相应配置,以便您的 Python 可执行文件首先出现。


推荐阅读