python - Pyspark 模块与纱线的使用
问题描述
我在纱线客户端模式下使用 pyspark shell。我用 conda pack 创建了一个档案,以便共享一些 python 模块,但我遇到了一些问题。
我启动:
pyspark --packages org.apache.spark:spark-avro_2.12:3.0.1 --archives /tmp/testST.tar.gz
其中 /tmp/testST.tar.gz 是一个 conda 环境,创建为:
conda pack -n testSATE -o /tmp/testST.tar.gz
当我导入 pyarrow 模块(在存档中)时,我获得:
>>> import pyarrow
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
ModuleNotFoundError: No module named 'pyarrow'
如果我将 pyspark 启动为:
pyspark --packages org.apache.spark:spark-avro_2.12:3.0.1 --conf spark.yarn.dist.archives=/tmp/testST.tar.gz
之后,我决定将模块部署在使用 conda 的机器上,并以以下方式启动 pyspark:
pyspark --packages org.apache.spark:spark-avro_2.12:3.0.1
这次正确找到了pyarrow模块,但是当我执行时:
>>> outputDF = sateFloatDF.withColumn("prediction", loaded_model(sateFloatDF.select("_c0","_c1","_c2")))
我收到:
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/opt/spark3/python/pyspark/sql/udf.py", line 197, in wrapper
return self(*args)
File "/opt/spark3/python/pyspark/sql/udf.py", line 177, in __call__
return Column(judf.apply(_to_seq(sc, cols, _to_java_column)))
File "/opt/spark3/python/pyspark/sql/column.py", line 68, in _to_seq
cols = [converter(c) for c in cols]
File "/opt/spark3/python/pyspark/sql/column.py", line 68, in <listcomp>
cols = [converter(c) for c in cols]
File "/opt/spark3/python/pyspark/sql/column.py", line 56, in _to_java_column
"function.".format(col, type(col)))
TypeError: Invalid argument, not a string or column: DataFrame[_c0: float, _c1: float, _c2: float] of type <class 'pyspark.sql.dataframe.DataFrame'>. For column literals, use 'lit', 'array', 'struct' or 'create_map' function.
>>> outputDF = sateFloatDF.withColumn("prediction", loaded_model(sateFloatDF._c0,sateFloatDF._c1,sateFloatDF._c2))
>>> outputDF.show()
21/04/19 10:39:07 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 2.0 (TID 2, gstp-slave-60-01.altecspace.it, executor 2): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/opt/spark3/python/pyspark/worker.py", line 589, in main
func, profiler, deserializer, serializer = read_udfs(pickleSer, infile, eval_type)
File "/opt/spark3/python/pyspark/worker.py", line 447, in read_udfs
udfs.append(read_single_udf(pickleSer, infile, eval_type, runner_conf, udf_index=i))
File "/opt/spark3/python/pyspark/worker.py", line 254, in read_single_udf
f, return_type = read_command(pickleSer, infile)
File "/opt/spark3/python/pyspark/worker.py", line 74, in read_command
command = serializer._read_with_length(file)
File "/opt/spark3/python/pyspark/serializers.py", line 172, in _read_with_length
return self.loads(obj)
File "/opt/spark3/python/pyspark/serializers.py", line 458, in loads
return pickle.loads(obj, encoding=encoding)
File "/opt/spark3/python/pyspark/cloudpickle.py", line 1110, in subimport
__import__(name)
我还没有清除 pyspark 执行流程。
你可以帮帮我吗?
谢谢
解决方案
推荐阅读
- python - 解析管道分隔的 CSV Python
- r - 使用 R 制作分组箱线图的麻烦
- javascript - Angular - img 在本地加载正常,但在开发服务器上加载不正常
- python - 克罗斯顿关于 Python 中间歇性需求的方法
- java - 如何在 Spring Boot 中为任何服务创建接口
- rust - 如何在 Rust 中使某些 Struct 的字段强制填写而其他字段是可选的?
- java - 合并两个具有重复键和更新值的 HashMap
- node.js - 节点中的缓冲区
- browser - 当我通过 cypress 启动浏览器时,在我的 Web 应用程序中看不到图标
- maven - 使用 Maven Tycho 构建的更新站点缺少插件和功能文件夹