首页 > 解决方案 > 无法从 AWS EMR 中的 Flink 访问 s3 文件

问题描述

我们在 AWS EMR 上有一个长期运行的 Flink 集群。它配置有默认角色 (EMR_EC2_DefaultRole)。我们尝试运行 Flink 作业,但它无法访问 s3 存储桶来读取文件。我们创建了最小的 main 方法代码来重现它:

String filePath = "s3://<our-bucket>/<the-file>";
logger.info("Path: " + filePath);
Path path = Paths.get(filePath);
logger.info("Successfully got path");
File file = path.toFile();
logger.info("Successfully got creds file");
logger.info("Exists [{}], isFile [{}] ", file.exists(), file.isFile());
String content = FileUtils.readFileToString(file);
logger.info("Content [{}]", content);

我们通过 Flink Web UI 运行 Flink 作业。我们得到除了日志之外的所有Content日志。

存在的日志是:Exists [false], isFile [false]

我们还收到以下错误:

Caused by: java.io.FileNotFoundException: File 's3:/<our-bucket>/<the-file>' does not exist
    at org.apache.commons.io.FileUtils.openInputStream(FileUtils.java:299)
    at org.apache.commons.io.FileUtils.readFileToString(FileUtils.java:1711)
    at org.apache.commons.io.FileUtils.readFileToString(FileUtils.java:1748)
    at com.<our-package>.Main.main(Main.java:39)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
    ... 10 more

当我们 ssh 到主 EC2 实例并运行以下命令时,它会工作并返回文件内容:

sudo hdfs dfs -cat s3://<our-bucket>/<the-file>

请帮忙 :)

标签: amazon-s3apache-flinkamazon-emr

解决方案


看起来您正在尝试将 S3 路径传递给org.apache.commons.io.FileUtils.readFileToString(),我认为这不会起作用。

您可以从该 S3 路径创建 FlinkPath并使用它来创建输入流,例如

Path = new Path("s3://<our-bucket>/<the-file>");
FileSystem fs = filePath.getFileSystem();
InputStream is = new DataInputStream(fs.open(filePath, readBufferSize));
String s = IOUtils.toString(is, charset);

推荐阅读