scala - AWS EMR 步骤和 HDFS 合并;Scala Shell 命令`
问题描述
我正在使用 AWS EMR 运行我们的一些 spark 程序。数据湖建立在 S3 之上,具有原始层和精选层。Spark 程序从原始区域提取数据并进行一些转换以放入策划层。我首先将结果本地存储在 HDFS 上,然后对 S3 进行 distcp;
一直以来,我们都是使用 SSH 登录 EMR 并部署 jar 并运行;然而,在生产中,我们的客户拒绝使用 SSH。我们对程序进行了更改,使其使用以下命令按步骤运行
aws emr add-steps --cluster-id j-CXXXXXXXXXJ --steps Type=Spark,Name="RUN MJ",ActionOnFailure=CONTINUE,Args=[--class,sparkMultiJoins.GenomeSparkJoins,s3://marketing-analytics-platform/genome_install/libs/SparkFrameWork-0.0.1-SNAPSHOT-jar-with-dependencies.jar,dbo.Property,abc,genomedb,genomedb];
我能够成功运行该程序;但是在执行 shell 命令以从 scala 进行 HDFS 合并时,我遇到了问题;下面是合并的代码。
val MergeFiles = "hadoop fs -text " + HDFSOutPath + "/part* | hadoop fs -put - " + HDFSOutPath +"/"+fileName+".csv"
//////execute the dynamically constructed command
MergeFiles.!
参数替换后的示例输出如下所示
hadoop fs -cat hdfs://ip-10-11-111-111.xxx.com:8020/StageData/Property/* | hadoop fs -put - hdfs://ip-10-11-111-111.pnmac.com:8020/StageData/Property/Property.csv
在执行上述操作时,它会将 cat 命令的输出(即文件上的所有数据)扔到标准输出上,即可以在 aws 日志的标准输出上看到它。出于这个原因,作业永远执行;如果我在 emr 集群上执行 hadoop fs(上面的命令),我期待它的行为方式;
请注意,我已经尝试使用 coalesce(1) 方法进行合并并放弃它,因为它非常慢。
还有什么我可以做的吗?
解决方案
推荐阅读
- python - JSONDecodeError:期望值:第 7 行第 1 列(字符 6)
- python - 如何在 django 中用博客文章标记评论?
- regular-language - 给定自动机的正则表达式
- java - 使用 jsoup get() 函数时出现 IndexOutOfBoundsException
- android - 如何在自身内部设置自定义视图高度的一半宽度?
- sql - 如何在 oracle sql 中再次使用 MAX 和 MIN 函数获取值?
- javascript - 使用 PHP 或 Javascript 为 Zebra 打印机将 RGB 图像转换为 Floyd-Steinberg 图像
- neo4j - 如何维护 org.neo4j.ogm.session.SessionFactory 对象
- karate - 如何轻松升级空手道版本?
- java - 如何正确使用可以有条件地返回值或抛出异常的 Java 8 Optional?