首页 > 解决方案 > Pyspark 模块与纱线的使用

问题描述

我在纱线客户端模式下使用 pyspark shell。我用 conda pack 创建了一个档案,以便共享一些 python 模块,但我遇到了一些问题。

我启动:

pyspark --packages org.apache.spark:spark-avro_2.12:3.0.1 --archives /tmp/testST.tar.gz

其中 /tmp/testST.tar.gz 是一个 conda 环境,创建为:

conda pack -n testSATE -o /tmp/testST.tar.gz

当我导入 pyarrow 模块(在存档中)时,我获得:

>>> import pyarrow
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
ModuleNotFoundError: No module named 'pyarrow'

如果我将 pyspark 启动为:

pyspark --packages org.apache.spark:spark-avro_2.12:3.0.1 --conf spark.yarn.dist.archives=/tmp/testST.tar.gz

之后,我决定将模块部署在使用 conda 的机器上,并以以下方式启动 pyspark:

pyspark --packages org.apache.spark:spark-avro_2.12:3.0.1

这次正确找到了pyarrow模块,但是当我执行时:

>>> outputDF = sateFloatDF.withColumn("prediction", loaded_model(sateFloatDF.select("_c0","_c1","_c2")))

我收到:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/opt/spark3/python/pyspark/sql/udf.py", line 197, in wrapper
    return self(*args)
  File "/opt/spark3/python/pyspark/sql/udf.py", line 177, in __call__
    return Column(judf.apply(_to_seq(sc, cols, _to_java_column)))
  File "/opt/spark3/python/pyspark/sql/column.py", line 68, in _to_seq
    cols = [converter(c) for c in cols]
  File "/opt/spark3/python/pyspark/sql/column.py", line 68, in <listcomp>
    cols = [converter(c) for c in cols]
  File "/opt/spark3/python/pyspark/sql/column.py", line 56, in _to_java_column
    "function.".format(col, type(col)))
TypeError: Invalid argument, not a string or column: DataFrame[_c0: float, _c1: float, _c2: float] of type <class 'pyspark.sql.dataframe.DataFrame'>. For column literals, use 'lit', 'array', 'struct' or 'create_map' function.
>>> outputDF = sateFloatDF.withColumn("prediction", loaded_model(sateFloatDF._c0,sateFloatDF._c1,sateFloatDF._c2))
>>> outputDF.show()
21/04/19 10:39:07 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 2.0 (TID 2, gstp-slave-60-01.altecspace.it, executor 2): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/spark3/python/pyspark/worker.py", line 589, in main
    func, profiler, deserializer, serializer = read_udfs(pickleSer, infile, eval_type)
  File "/opt/spark3/python/pyspark/worker.py", line 447, in read_udfs
    udfs.append(read_single_udf(pickleSer, infile, eval_type, runner_conf, udf_index=i))
  File "/opt/spark3/python/pyspark/worker.py", line 254, in read_single_udf
    f, return_type = read_command(pickleSer, infile)
  File "/opt/spark3/python/pyspark/worker.py", line 74, in read_command
    command = serializer._read_with_length(file)
  File "/opt/spark3/python/pyspark/serializers.py", line 172, in _read_with_length
    return self.loads(obj)
  File "/opt/spark3/python/pyspark/serializers.py", line 458, in loads
    return pickle.loads(obj, encoding=encoding)
  File "/opt/spark3/python/pyspark/cloudpickle.py", line 1110, in subimport
    __import__(name)

我还没有清除 pyspark 执行流程。

你可以帮帮我吗?

谢谢

标签: pythonpysparkdeploymentmoduledependencies

解决方案


推荐阅读