java - Apache Crunch 无法写入输出
问题描述
可能是疏忽,但我无法发现为什么 Apache Crunch 不会将输出写入文件,因为我正在编写一个非常简单的程序来学习 Crunch ..
这是代码:
import org.apache.crunch.Pipeline;
import org.apache.hadoop.conf.Configuration;
....
private Pipeline pipeline;
private Configuration etlConf;
....
this.etlConf = getConf();
this.pipeline = new MRPipeline(TestETL.class, etlConf);
....
// Read file
logger.info("Reading input file: " + inputFileURI.toString());
PCollection<String> input = pipeline.readTextFile(inputFileURI.toString());
System.out.println("INPUT SIZE = " + input.asCollection().getValue().size());
// Write file
logger.info("Writing Final output to file: " + outputFileURI.toString());
input.write(
To.textFile(outputFileURI.toString()),
WriteMode.OVERWRITE
);
这是我使用 hadoop 执行此 jar 时看到的日志记录:
18/12/31 09:41:51 INFO etl.TestClass: Executing Test run
18/12/31 09:41:51 INFO etl.TestETL: Reading input file: /user/sw029693/process_analyzer/input/input.txt
INPUT SIZE = 3
18/12/31 09:41:51 INFO etl.TestETL: Writing Final output to file:
/user/sw029693/process_analyzer/output/occurences
18/12/31 09:41:51 INFO impl.FileTargetImpl: Will write output files to new path: /user/sw029693/process_analyzer/output/occurences
18/12/31 09:41:51 INFO etl.TestETL: Cleaning-up TestETL run
18/12/31 09:41:51 INFO etl.TestETL: ETL completed with status 0.
输入文件非常简单,如下所示:
this is line 1
this is line 2
this is line 3
尽管日志记录表明输出位置应该发生了写入,但我没有看到任何文件正在创建。有什么想法吗?
解决方案
package com.hadoop.crunch;
import java.io.*;
import java.util.Collection;
import java.util.Iterator;
import org.apache.crunch.*;
import org.apache.crunch.impl.mr.MRPipeline;
import org.apache.crunch.io.From;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.util.*;
import org.apache.log4j.Logger;
public class App extends Configured implements Tool, Serializable{
private static final long serialVersionUID = 1L;
private static Logger LOG = Logger.getLogger(App.class);
@Override
public int run(String[] args) throws Exception {
final Path fileSource = new Path(args[0]);
final Path outFileName = new Path(args[1], "event-" + System.currentTimeMillis() + ".txt");
//MRPipeline translates the overall pipeline into one or more MapReduce jobs
Pipeline pipeline = new MRPipeline(App.class, getConf());
//Specify the input data to the pipeline.
//The input data is contained in PCollection
PCollection<String> inDataPipe = pipeline.read(From.textFile(fileSource));
//inject an operation into the crunch data pipeline
PObject<Collection<String>> dataCollection = inDataPipe.asCollection();
//iterate over the collection
Iterator<String> iterator = dataCollection.getValue().iterator();
FileSystem fs = FileSystem.getLocal(getConf());
BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(fs.create(outFileName, true)));
while(iterator.hasNext()){
String data = iterator.next().toString();
bufferedWriter.write(data);
bufferedWriter.newLine();
}
bufferedWriter.close();
//Start the execution of the crunch pipeline, trigger the creation & execution of MR jobs
PipelineResult result = pipeline.done();
return result.succeeded() ? 0 : 1;
}
public static void main(String[] args) {
if (args.length != 2)throw new RuntimeException("Usage: hadoop jar <inputPath> <outputPath>");
try {
ToolRunner.run(new Configuration(), new App(), args );
} catch (Exception e) {
LOG.error(e.getLocalizedMessage());
}
}
}
用法:作为带有参数的java程序运行:第一个参数是输入文件名或目录,第二个参数是输出文件目录。输出文件名为 event-Timestamp 并记住 args{0} 和 args{1} 之间有一个空格。/user/sw029693/process_analyzer/input/input.txt /user/sw029693/process_analyzer/input/
推荐阅读
- postgresql - 过滤日期,其中日期是上个季度的日期
- javascript - 为什么razorpay 支付网关显示此错误?
- php - Symfony 5:驱动程序发生异常:找不到驱动程序
- javascript - glyphicon-calendar 显示禁用分钟选项
- sudo - Using Yubikey to secure sudo
- jekyll - My bootstrap dropdown menu works on local host but not on github
- django - 在 django 中重写 AND/OR 查询
- java - Unable to build FatJAR using the shadowJar plugin in Gradle
- azure - 使用 Terraform 在 Azure 中创建多个 VM 并为这些 VM 分配角色。(卡在 for_each 循环中)
- elasticsearch - Write A elastic search query Total Searched Questions Against Each Category