python - 在 PySpark 中导入用户定义的模块失败
问题描述
我有以下python代码:
from service import Api
from pyspark.sql import SparkSession
...
spark = SparkSession.builder.appName("App Name").enableHiveSupport().getOrCreate()
myApi= Api()
df = spark.sql('SELECT * FROM hive_table')
def map_function(row):
sql = 'SELECT Name FROM sql_table LIMIT 1'
result = myApi.executeSQL(sql)
if int(row[4]) > 100:
return (result[0][0], row[4])
else:
return (row[3], row[4])
schema = StructType([StructField('Name', StringType(), True), StructField('Value', IntegerType(), True)])
rdd_data = df.rdd.map(map_function)
df1 = spark.createDataFrame(rdd_data, schema)
df1.show()
我创建了一个 Spark DataFrame 并使用 map 函数进行迭代。在 map 函数中,我访问了之前为 SQL 表定义的 Api。
此代码在控制台和 Apache Zeppelin Notebook 中成功运行且没有错误。但是,如果我在脚本中执行它,则会发生以下错误:
ImportError: No module named Api
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:330)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:470)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:453)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:284)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:836)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:836)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
它在访问 map 函数中的 myApi 对象时发生。在服务模块的文件夹中是一个__init__.py
方法,所以这不是问题。
有谁知道问题可能是什么?
解决方案
如果您正在运行您的作业,则spark-submit
需要使用该--py-files
标志提供 python 文件。首先,创建一个.zip
包含所有依赖项的文件:
pip install -t dependencies -r requirements.txt
cd dependencies
zip -r ../dependencies.zip .
最后使用以下方法传递依赖项--py-files
:
spark-submit --py-files dependencies.zip your_spark_job.py
最后在您的 spark 作业脚本中添加以下行:
sc.addPyFile("dependencies.zip")
或者,如果您使用的是 Jupyter Notebook,您所要做的就是将模块的路径附加到 PYTHONPATH:
export PYTHONPATH="${PYTHONPATH}:/path/to/your/service.py"
推荐阅读
- batch-file - 批处理脚本重命名所有子文件夹中的最新文件?
- python - 如何从列表列表中创建字典键?
- javascript - 不和谐每日问候
- javascript - 是否有 Chrome API 可以访问响应模式以调整到自定义宽度和高度?
- html - 如何仅使用 html 在某个网站中显示没有标题的另一个网站?
- linux - Traefik:所有子目录返回 404
- python-3.x - 我想找到所有包含某些单词/单词的标题行,这些单词/单词要被抓取并保存到文本文件中
- python - 使用变量将数字转换为科学计数法python
- laravel - 将验证错误从 Laravel 控制器发送回 Vue 组件的正确方法是什么?
- ios - iOS应用程序空白navigationView在某些设备模拟器上打开屏幕,而不是在其他设备模拟器上