python - PySpark 在终端中有效,但在 Python 代码中执行时无效
问题描述
我正在尝试读取 avro 文件类型。以下是我在网上找到的用于测试我的代码的示例数据源:
https://github.com/Teradata/kylo/blob/master/samples/sample-data/avro/userdata1.avro
以下是我的代码(请假设这source_path
是上面链接的数据的路径):
from pyspark.sql import SparkSession
def avro_reader(source_path: str):
spark = SparkSession \
.builder \
.master("yarn") \
.enableHiveSupport() \
.config("hive.exec.dynamic.partition", "true") \
.config("hive.exec.dynamic.partition.mode", "nonstrict") \
.getOrCreate()
reader = spark.read.format("com.databricks.spark.avro").load(source_path)
return reader.show()
print(avro_reader(source_path))
以下是我收到的错误:
Warning: Ignoring non-Spark config property: hive.exec.dynamic.partition.mode
Warning: Ignoring non-Spark config property: hive.exec.dynamic.partition
Exception in thread "main" org.apache.spark.SparkException: When running with master 'yarn' either HADOOP_CONF_DIR or YARN_CONF_DIR must be set in the environment.
at org.apache.spark.deploy.SparkSubmitArguments.error(SparkSubmitArguments.scala:631)
at org.apache.spark.deploy.SparkSubmitArguments.validateSubmitArguments(SparkSubmitArguments.scala:271)
at org.apache.spark.deploy.SparkSubmitArguments.validateArguments(SparkSubmitArguments.scala:234)
at org.apache.spark.deploy.SparkSubmitArguments.<init>(SparkSubmitArguments.scala:119)
at org.apache.spark.deploy.SparkSubmit$$anon$2$$anon$3.<init>(SparkSubmit.scala:1022)
at org.apache.spark.deploy.SparkSubmit$$anon$2.parseArguments(SparkSubmit.scala:1022)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:85)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1039)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1048)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
pyspark
当我在终端中运行时,Spark 工作得非常好。所以,我不确定是什么导致了这个问题。下面是pyspark
在终端中运行的输出:
Python 3.8.2 (default, Apr 8 2021, 23:19:18)
[Clang 12.0.5 (clang-1205.0.22.9)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
21/06/15 01:15:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 3.1.2
/_/
Using Python version 3.8.2 (default, Apr 8 2021 23:19:18)
按照删除的建议.master("yarn")
,这是错误:
Warning: Ignoring non-Spark config property: hive.exec.dynamic.partition.mode
Warning: Ignoring non-Spark config property: hive.exec.dynamic.partition
21/06/15 12:40:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Traceback (most recent call last):
File "main.py", line 88, in <module>
print(avro_reader('userdata1.avro'))
File "main.py", line 26, in avro_reader
reader = spark.read.format("com.databricks.spark.avro").load(source_path)
File "/Users/zaki.siyaji/Desktop/avro_proj/venv/lib/python3.8/site-packages/pyspark/sql/readwriter.py", line 204, in load
return self._df(self._jreader.load(path))
File "/Users/zaki.siyaji/Desktop/avro_proj/venv/lib/python3.8/site-packages/py4j/java_gateway.py", line 1304, in __call__
return_value = get_return_value(
File "/Users/zaki.siyaji/Desktop/avro_proj/venv/lib/python3.8/site-packages/pyspark/sql/utils.py", line 111, in deco
return f(*a, **kw)
File "/Users/zaki.siyaji/Desktop/avro_proj/venv/lib/python3.8/site-packages/py4j/protocol.py", line 326, in get_return_value
raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o36.load.
: java.lang.ClassNotFoundException: Failed to find data source: org.apache.spark.sql.avro.AvroFileFormat. Please find packages at http://spark.apache.org/third-party-projects.html
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:692)
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:746)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:265)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:239)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.avro.AvroFileFormat.DefaultSource
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$5(DataSource.scala:666)
at scala.util.Try$.apply(Try.scala:213)
at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$4(DataSource.scala:666)
at scala.util.Failure.orElse(Try.scala:224)
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:666)
... 14 more
解决方案
.master("yarn")
如果您在本地运行,而不是在 YARN 集群上运行,请删除。如果您在 YARN 集群上运行,那么您需要正确设置环境,并按照提交给 YARN 的文档进行操作。
新错误消息后更新:
- 您需要从 更改为
"com.databricks.spark.avro"
,"avro"
因为 Spark 本身现在支持 Avro。 - 并且您需要提交带有正确库的作业(doc):
./bin/spark-submit --packages org.apache.spark:spark-avro_2.12:<spark_version>
推荐阅读
- java - 如何在 MyBatis 中将整数转换为枚举?
- recursion - 此函数中的 LEA 指令的目的是什么?整体递归的作用是什么?
- javascript - 在php while循环中单击按钮时获取特定的div
- youtube-api - 使用 JavaScript 以相同分辨率显示多个 YouTube 缩略图
- python - 如何避免我的 scipy 插值样条曲线添加无关的零?
- html - content 属性在 Firefox 中不起作用,但在 chrome 中起作用?
- c# - 未找到给定字符时拆分字符串时的错误结果
- html - 具有过滤角材料的复杂类型表
- linux - 用于在远程 Linux 机器上启动进程并使其与启动服务器分离的 Bash 脚本
- ios - 如果 iPad 被锁定,Xamarin Forms 应用程序崩溃,而在 Visual Studio 2017 中运行