首页 > 解决方案 > 跨账户 STS 承担角色的 Pyspark S3A 访问被拒绝异常

问题描述

我设置了一个 AWS Glue 作业来处理另一个 AWS 账户 B 中存在的 S3 文件。账户 A 中的 IAM 角色(粘合作业 IAM 角色)正在使用 STS 担任账户 B 中的一个角色,该角色提供对我所需文件的访问权限。账户 B 的 IAM 角色与账户 A 中的 Glue 作业角色具有信任关系。我能够打印访问密钥和秘密密钥,因此假设 STS 运行良好。

我得到以下错误:

An error occurred while calling o83.json. com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: Access Denied (Service: Amazon S3; Status Code: 403; Error Code: AccessDenied;

当我得到拒绝访问异常时,S3A 连接器的正确实现是什么。

这是我的代码:

from __future__ import print_function
from pyspark import SparkContext 
from pyspark import SparkConf
from pyspark import SQLContext 
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.functions import col
from pyspark.sql.types import *
from pyspark.sql import HiveContext
from pyspark.sql.functions import explode
from pyspark.sql.functions import explode_outer
from pyspark.sql.functions import substring_index
from pyspark.sql.functions import input_file_name
from pyspark.sql import functions as f 
import sys
import os
import os
import boto3
import sys
import errno
import time
import datetime
from datetime import timedelta, date
from pyspark.sql.functions import split
from pyspark.sql.functions import substring
from boto3.session import Session

spark = SparkSession\
        .builder\
        .appName("JsonInputFormat")\
        .enableHiveSupport()\
        .getOrCreate()\

sc = spark.sparkContext

hive_context = HiveContext(sc)

hive_context.setConf("hive.exec.dynamic.partition", "true")
hive_context.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
hive_context.setConf("hive.serialization.extend.nesting.levels","true")


sqlCtx = HiveContext(sc)

client = boto3.client('sts')
response = client.assume_role(RoleArn='ROLE_TO_ASSUME', RoleSessionName='AssumeRoleSession1')
credentials = response['Credentials']


ACCESS_KEY = credentials['AccessKeyId']
SECRET_KEY = credentials['SecretAccessKey']
print('access key is {}'.format(ACCESS_KEY))
print('secret key is {}'.format(SECRET_KEY))
print("Hadoop version: " + sc._gateway.jvm.org.apache.hadoop.util.VersionInfo.getVersion())
session = Session(aws_access_key_id=ACCESS_KEY, aws_secret_access_key=SECRET_KEY)
s3      = session.resource('s3')

spark._jsc.hadoopConfiguration().set("fs.s3a.access.key", ACCESS_KEY)
spark._jsc.hadoopConfiguration().set("fs.s3a.secret.key", SECRET_KEY)
spark._jsc.hadoopConfiguration().set("com.amazonaws.services.s3a.enableV4", "true")
spark._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "s3-us-east-1.amazonaws.com")
spark._jsc.hadoopConfiguration().set("fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem")


def flatten_schema(schema):
    """Take schema as returned from schema().jsonValue()
    and return list of field names with full path"""
    def _flatten(schema, path="", accum=None):
        # Extract name of the current element
        name = schema.get("name")
        # If there is a name extend path
        if name is not None:
            path = "{0}.{1}".format(path, name) if path else name
            print('path is {}'.format(path))
        # It is some kind of struct
        if isinstance(schema.get("fields"), list):
            for field in schema.get("fields"):
                _flatten(field, path, accum)
        elif isinstance(schema.get("type"), dict):
            _flatten(schema.get("type"), path, accum)
        # It is an atomic type
        else:
            accum.append(path)
    accum = []
    _flatten(schema, "", accum)
    return  accum

sqlCtx.sql("set spark.sql.caseSensitive=true")

yesterday = date.today() - timedelta(1)
daybefore=yesterday.strftime("%Y-%m-%d")
currentdate=time.strftime("%Y-%m-%d")
key = 'KEY={}'.format(str(daybefore))
bucket = 'BUKCET_NAME'
df_base=spark.read.json('s3a://{}/{}/*/'.format(bucket,key))

base_schema=df_base.schema

datePrefix=str(daybefore)
source='s3a://{}/{}'.format(bucket,key)

df1=spark.read.json(source,schema=base_schema)


schema=df1.schema.jsonValue()
columns_list=flatten_schema(schema)

print('columns list is {}'.format(columns_list))


df2 = df1.select(*(col(x).alias(x.replace('.','_')) for x in columns_list))
print('df2 is {}'.format(df2))
df3=df2.select("*",explode_outer(df2.contents).alias("contents_flat"))
df3=df3.drop("contents")
print('df3 is {}'.format(df3))


schema4=df3.schema.jsonValue()
columns_list4=flatten_schema(schema4)
print('columns list 4 is {}'.format(columns_list4))
df5 = df3.select(*(col(x).alias(x.replace('.','_')) for x in columns_list4))
print('df5 is {}'.format(df5))

schema5=df5.schema.jsonValue()
columns_list5=flatten_schema(schema5)
print('columns list 5 is {}'.format(columns_list5))
df6 = df5.select(*(col(x).alias(x.replace('contents_flat','contents')) for x in columns_list5))
print('df6 is {}'.format(df6))

schema6=df6.schema.jsonValue()
columns_list6=flatten_schema(schema6)
print('column list 6 is {}'.format(columns_list6))
df7 = df6.select(*(col(x) for x in columns_list6)) #above line edited down here

schema7=df7.schema.jsonValue()
print('schema7 is {}'.format(schema7))
columns_list7=flatten_schema(schema7)
print('columns list 7 is {}'.format(columns_list7))
df7 = df7.select(*(col(x).alias(x.replace('.','_')) for x in columns_list7))
df7=df7.select("*",explode_outer(df7.business_address_latLng_warnings).alias("business_address_latLng_warnings_flat"))
df7=df7.drop("business_address_latLng_warnings")

print('df7 is {}'.format(df7))

df8 = df7.withColumn("filename",input_file_name())
split_col = split(df8['filename'], 'short_date=')
df9 = df8.withColumn('shortfilename', split_col.getItem(1))
df_final = df9.withColumn('filedate', substring('shortfilename',1,10)).drop('shortfilename')
print('df_final is {}'.format(df_final))

df_final.write.mode('append').csv('s3://{bucket}/{folder1}/', header='true')

spark.stop()```

标签: apache-sparkpysparkamazon-iamaws-glueaws-sts

解决方案


推荐阅读