apache-spark - 在 AWS EMR 上使用 spark;您提供的 AWS 访问密钥 ID 不存在于我们的记录中。但是 boto3 调用工作得很好
问题描述
我正在尝试在 EMR 上读取 SPARK 中的文件,该文件已在不同系统 (Illumina ICA) 中提供了临时凭据。
当尝试使用 spark.read.csv 读取文件时,使用 S3 URI,它给了我错误:
Py4JJavaError: An error occurred while calling o65.csv.
: java.io.IOException: com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: The AWS Access Key Id you provided does not exist in our records. (Service: Amazon S3; Status Code: 403; Error Code: InvalidAccessKeyId;
但是当我使用 BOTO3 调用尝试相同的凭据时,它工作得很好,所以凭据(在环境中)很好。
这是我的测试代码(来自笔记本)
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df = spark.read.csv('s3://stratus-gds-use1/241dd164-decb-48f6-eba1-08d881d902b2/dummy.vcf.gz', sep='\t')
#... Py4JJavaError: An error occurred while calling o65.csv. ##: java.io.IOException: com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: The AWS Access Key Id you provided does not exist in our records. (Service: Amazon S3; Status Code: 403; Error Code: InvalidAccessKeyId;
access_key_id=os.environ['AWS_ACCESS_KEY_ID']
secret_access_key=os.environ['AWS_SECRET_ACCESS_KEY']
region=os.environ['AWS_DEFAULT_REGION']
session_token=os.environ['AWS_SESSION_TOKEN']
bucket_name='stratus-gds-use1'
key_prefix='241dd164-decb-48f6-eba1-08d881d902b2/dummy.vcf.gz'
import boto3
s3_session = boto3.session.Session(aws_access_key_id=access_key_id,
aws_secret_access_key=secret_access_key,
aws_session_token=session_token,
region_name = region)
s3_client = s3_session.client('s3')
%ls -l dummy.vcf.gz
#-=> ls: cannot access dummy.vcf.gz: No such file or directory
r = s3_client.download_file(Filename='dummy.vcf.gz',
Bucket=bucket_name,
Key=key_prefix)
%ls -l dummy.vcf.gz
#-=> -rw-rw-r-- 1 hadoop hadoop 2535 Apr 6 18:45 dummy.vcf.gz
为什么 AWS EMR 上的 spark 无法使用提供的 S3 URI 访问文件的任何想法?
我已经测试了其他类似的 S3 URI,它们工作正常,所以 java 类工作正常。
解决方案
我终于想出了解决方案。我需要在 Spark 配置中提供临时 AWS 凭证并提供特殊类
org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider
以及 session_token..
因此,这是确保 spark 可以使用临时凭证读取 S3 存储桶的过程。
import pyspark
from pyspark.sql import SparkSession
conf = (
pyspark.SparkConf()
.set('spark.hadoop.fs.s3a.aws.credentials.provider','org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider')
.set('spark.hadoop.fs.s3a.access.key', access_key_id)
.set('spark.hadoop.fs.s3a.secret.key', secret_access_key)
.set('spark.hadoop.fs.s3a.session.token', session_token)
)
spark = SparkSession.builder.config(conf=conf).getOrCreate()
df = spark.read.csv(f's3a://{BUCKET}/{KEY_PREFIX}', sep='\t')
推荐阅读
- python - Python Click:如何为全局上下文更改 `info_name` 的值
- python - 从 csv 文件中读取特定列
- swift - SwiftUI - 无法在 Firebase 的 observeSingleEvent 内分配变量。返回零
- nuget - 依赖项中的 Nuget 包冲突
- javascript - 解析(输入)为 JSON 字符串
- ruby-on-rails - 使用基于 API 密钥的身份验证时使用的正确标头是什么
- android - mvvm 使用 dagger2 和 ViewModelFactory,不理解默认注入
- cython - 如果有同名的属性,Cython 不能引用 cdef 类中的声明类型?
- docker - Docker BuildKit 是否被认为是稳定的/生产友好的?
- javascript - 在输入字段下方使用带有自定义消息的 javascript 进行电子邮件验证