首页 > 解决方案 > 在 PySpark 中读取两个具有不同身份验证令牌的文件

问题描述

PySpark用来将几个文件读入数据帧,并执行它们的联合。由于这两个文件具有不同的权限授予,我正在使用org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider读取这两个文件。但是,当我尝试读取第二个文件时出现错误(我可以单独读取两个文件,但不能一起读取)。

读取文件的代码:

def read_file(file_path, file_id):
    aws_tokens = get_aws_tokens_for_file(file_id)
    spark._jsc.hadoopConfiguration().set('fs.s3a.access.key', aws_access_key)
    spark._jsc.hadoopConfiguration().set('fs.s3a.secret.key', aws_secret_key)
    spark._jsc.hadoopConfiguration().set('fs.s3a.session.token', aws_session_token)
    spark._jsc.hadoopConfiguration().set('fs.s3a.connection.ssl.enabled', 'true')
    spark._jsc.hadoopConfiguration().set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider")
    df = spark.read.parquet(file_path)
    return df

现在,以下在两个不同的 pyspark-session 中工作:

df1 = read_file(file_1_path, file_1_id)
df2 = read_file(file_2_path, file_2_id)

但是,当我尝试以下操作时,它会失败并显示java.nio.file.AccessDeniedException

df1 = read_file(file_1_path, file_1_id)
df2 = read_file(file_2_path, file_2_id)
df3 = df1.union(df2)
print(df3.count())

造成这种情况的一个原因可能是s3仅在执行操作时才实际读取文件,并且在执行操作时,两个文件所需的 aws-credentials 是不同的。

所以我尝试保留第一个文件,然后读取第二个文件,但同样的异常也失败了:

df1 = read_file(file_1_path, file_1_id)
_ = df1.persist(StorageLevel.MEMORY_AND_DISK).count()

df2 = read_file(file_2_path, file_2_id) #fails here itself

那么如何将需要不同 aws-authentication 凭据的两个此类文件合并?

标签: pyspark

解决方案


Spark 进行按需计算,包括读取数据。S3A FileSystem 类的实例也将被存储桶 URI 缓存...更改配置只有在存储桶不同时才会生效。

您可以使用每个存储桶设置来更改不同 s3 存储桶的凭证/凭证提供程序。如果您的数据位于单独的存储桶中,那应该可以工作。查看 hadoop s3 文档了解详情。


推荐阅读