首页 > 解决方案 > fs.s3 配置有两个带 EMR 的 s3 帐户

问题描述

我有使用 lambda 和 EMR 的管道,我从一个 s3 账户 A 读取 csv,并将 parquet 写入账户 B 中的另一个 s3。我在账户 B 中创建了 EMR,并且可以访问账户 B 中的 s3。我无法添加账户 A s3 存储桶访问权限在 EMR_EC2_DefaultRole 中(因为这个帐户是企业范围的数据存储),所以我使用 accessKey,密钥来访问帐户 A s3 存储桶。这是通过 congnito 令牌完成的。

方法1

我正在使用 fs.s3 协议从帐户 A 从 s3 读取 csv 并在帐户 B 上写入 s3。我有 pyspark 代码,它从 s3 (A) 读取并写入 parquet s3 (B) 我一次提交作业 100 .这个 pyspark 代码在 EMR 中运行。

使用以下设置读取

hadoop_config = sc._jsc.hadoopConfiguration()
hadoop_config.set("fs.s3.awsAccessKeyId", dl_access_key)
hadoop_config.set("fs.s3.awsSecretAccessKey", dl_secret_key)
hadoop_config.set("fs.s3.awsSessionToken", dl_session_key)

spark_df_csv = spark_session.read.option("Header", "True").csv("s3://somepath")

写作:

我正在使用 s3a 协议s3a://some_bucket/

它有效,但有时我看到

  1. _temporary 文件夹存在于 s3 存储桶中,并非所有 csv 都转换为镶木地板
  2. 当我将 EMR 并发启用为 256 (EMR-5.28) 并提交 100 个作业时,我得到 _temporary 重命名错误。

问题:

  1. 此方法创建临时文件夹,有时它不会删除它。我可以在 s3 存储桶中看到 _temporary 文件夹。
  2. 当我启用 EMR 并发(EMR 最新 versin5.28)它允许并行运行步骤时,我得到一些文件的重命名 _temporary 错误。

方法2:

我觉得 s3a 不适合并行工作。所以我想使用fs.s3读写,因为它有更好的文件提交者。

所以我最初这样做了,我将上面的 hadoop 配置设置为帐户 A,然后取消设置配置,以便它最终可以访问默认帐户 B s3 存储桶。这样

hadoop_config = sc._jsc.hadoopConfiguration()
hadoop_config.unset("fs.s3.awsAccessKeyId")
hadoop_config.unset("fs.s3.awsSecretAccessKey")
hadoop_config.unset("fs.s3.awsSessionToken")


spark_df_csv.repartition(1).write.partitionBy(['org_id', 'institution_id']). \
    mode('append').parquet(write_path)

问题

这可行,但问题是如果我触发 lambda,而后者又为 100 个文件(循环中)提交作业,大约 10 个奇数文件导致在将文件写入 s3 存储桶时访问被拒绝。

java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n ... 1 更多\n原因:com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3 .model.AmazonS3Exception:访问被拒绝(服务:

这可能是因为此未设置有时不起作用,或者是因为并行运行 Spark 上下文/会话设置未设置在并行中发生?我的意思是,一个作业的 spark 上下文正在扰乱 hadoop 配置,而另一个正在设置它,这可能会导致这个问题,尽管不确定 spark 上下文是如何并行工作的。

不是每个作业都有单独的 Spark 上下文和会话。请针对我的情况提出替代方案。

标签: apache-sparkamazon-s3pysparkamazon-emr

解决方案


推荐阅读