amazon-web-services - 使用 pyspark 从 AWS s3 Bucket 读取 csv 时出错
问题描述
我正在使用在其中运行 spark-cluster 的 docker-compose 文件。SPARK_VERSION="3.0.0" HADOOP_VERSION="3.2"
所有文件都可以在以下 Github 链接中找到:https ://github.com/cluster-apps-on-docker/spark-standalone-cluster-on-docker
我正在尝试使用以下代码从 aws s3 读取 csv 文件:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = "--packages=com.amazonaws:aws-java-sdk-bundle:1.11.874,org.apache.hadoop:hadoop-aws:3.2.0 pyspark-shell"
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
import datetime
from pyspark.sql.types import DateType
from pyspark.sql.functions import col
from pyspark.sql.functions import when
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import StructField,StringType,IntegerType,DoubleType, LongType,StructType
sparkConf = SparkConf()
sparkConf.setMaster("spark://spark-master:7077")
sparkConf.setAppName("pyspark-2")
sparkConf.set("spark.executor.memory", "512m")
spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()
sc = spark.sparkContext
AWS_ACCESS_KEY_ID = "AKIAXJFEEOGKW4RSZG3B"
AWS_SECRET_ACCESS_KEY = "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"
sc.setSystemProperty("com.amazonaws.services.s3.enableV4", "true")
hadoop_conf=sc._jsc.hadoopConfiguration()
#hadoop_conf.set("fs.s3a.endpoint", "s3.us-east-2.amazonaws.com")
hadoop_conf.set("fs.s3a.aws.credentials.provider","org.apache.hadoop.fs.s3a.BasicAWSCredentialsProvider")
hadoop_conf.set("fs.s3a.access.key", AWS_ACCESS_KEY_ID)
hadoop_conf.set("fs.s3a.secret.key", AWS_SECRET_ACCESS_KEY)
hadoop_conf.set("fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem")
df = spark.read.csv("s3a://<PATH>)
但我收到以下错误:
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-2-24141d88d3e8> in <module>
----> 1 df = spark.read.csv("s3a://migrationawsbucket/allevents.csv")
/usr/local/lib/python3.7/dist-packages/pyspark/sql/readwriter.py in csv(self, path, schema, sep, encoding, quote, escape, comment, header, inferSchema, ignoreLeadingWhiteSpace, ignoreTrailingWhiteSpace, nullValue, nanValue, positiveInf, negativeInf, dateFormat, timestampFormat, maxColumns, maxCharsPerColumn, maxMalformedLogPerPartition, mode, columnNameOfCorruptRecord, multiLine, charToEscapeQuoteEscaping, samplingRatio, enforceSchema, emptyValue, locale, lineSep, pathGlobFilter, recursiveFileLookup)
533 path = [path]
534 if type(path) == list:
--> 535 return self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path)))
536 elif isinstance(path, RDD):
537 def func(iterator):
/usr/local/lib/python3.7/dist-packages/py4j/java_gateway.py in _call_(self, *args)
1303 answer = self.gateway_client.send_command(command)
1304 return_value = get_return_value(
-> 1305 answer, self.gateway_client, self.target_id, self.name)
1306
1307 for temp_arg in temp_args:
/usr/local/lib/python3.7/dist-packages/pyspark/sql/utils.py in deco(*a, **kw)
129 def deco(*a, **kw):
130 try:
--> 131 return f(*a, **kw)
132 except py4j.protocol.Py4JJavaError as e:
133 converted = convert_exception(e.java_exception)
/usr/local/lib/python3.7/dist-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
--> 328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
Py4JJavaError: An error occurred while calling o37.csv.
: java.lang.NoClassDefFoundError: org/apache/hadoop/fs/StreamCapabilities
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:756)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.hadoop.conf.Configuration.getClassByNameOrNull(Configuration.java:2134)
at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2099)
at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2195)
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2654)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2667)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2685)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
at org.apache.spark.sql.execution.streaming.FileStreamSink$.hasMetadata(FileStreamSink.scala:46)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:361)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:279)
at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:268)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:268)
at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:705)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.fs.StreamCapabilities
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
... 41 more
我尝试了在此链接中找到的不同解决方案:How can I read from S3 in pyspark running in local mode?
- -更新 - - -
我尝试按照 stevel 的建议更改aws-java-sdk-bundle
and ,并在以下位置找到不同的版本: https ://mvnrepository.com/artifact/org.apache.hadoop/hadoop-awshadoop-aws
我尝试使用: hadoop-aws-3.2.0.jar
并
aws-java-sdk-bundle-1.11.874.jar
遵循此处找到的答案:AWS EKS Spark 3.0, Hadoop 3.2 Error - NoClassDefFoundError: com/amazonaws/services/s3/model/MultiObjectDeleteException
但同样的错误
解决方案
hadoop-aws JAR 和 aws-sdk 之间的根本不匹配。怕他们不混搭。
使用mvnrepo 计算出您需要的 hadoop-* 工件 spark 的确切版本。并且不要考虑混合 hadoop-* 工件版本,因为它们与 spark JAR 一样紧密集成
我建议您使用 hadoop-3.2.x 或更高版本的工件升级到 spark 版本,这些工件与最近的 1.11 AWS 开发工具包兼容。
推荐阅读
- dart - Dart 中的 const 构造函数初始化失败
- python - 将熊猫数据框转换为具有所有组合的字典
- python - 用于 INSERT INTO SELECT 模式的 Python 正则表达式/正则表达式
- c# - Mongodb查询以获取特定于c#的嵌套文档的值
- google-apps-script - 谷歌脚本在编辑时将值从一列复制并粘贴到另一列
- javascript - TypeScript如何访问promise方法中的字段
- dictionary - 如何使用反射反转地图
- informatica - Informatica 服务自动停止
- php - 如何在 PHP docker 镜像上安装 yarn 和 npm(symfony 4 项目)
- excel - 将多个工作簿中的值合并到一个工作簿