java - Hadoop MapReduce 无输出
问题描述
我正在编写一个 hadoop MapReduce 程序并在 AWS EMR 集群上运行它。但是我无法获得预期的输出,它表明在 Mapper 阶段根本没有输出。
我的程序倾向于从另一个包含 Text 和 DoubleWritable 的 M/R 作业接收文件。这就像一个名称/收入对。并且 Mapper 应该得到前 5 名收入的 5 条记录。Reducer 应该从 8 个映射器中获得 40 条记录,并获得最终的 top5 名称/收入对。
Mapper 的输入文件是来自 hadoop M/R 作业的 (Text,DoubleWritable)。我的程序的输出也应该是 (Text,DoubleWritable)
这是我的源代码。
public class Top5Count extends Configured implements Tool{
static int printUsage() {
System.out.println("wordcount [-m <maps>] [-r <reduces>] <input> <output>");
ToolRunner.printGenericCommandUsage(System.out);
return -1;
}
public static class Tuple implements Comparable<Tuple>{
String id;
double revenue;
public Tuple (String id, double revenue) {
this.id = id;
this.revenue = revenue;
}
public String toString() {
return id + "," + revenue;
}
@Override
public int compareTo(Tuple o) {
if (this.revenue > o.revenue)
return 1;
else if (this.revenue == o.revenue)
return 0;
else
return -1;
}
}
public static class Top5CountMapper
extends Mapper<Object, Text, IntWritable, Text> {
private final Text word = new Text();
private final IntWritable one = new IntWritable(1);
private final PriorityQueue<Tuple> queue = new PriorityQueue<>();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString(), "\t");
String id = itr.nextToken();
double revenue = Double.parseDouble(itr.nextToken());
queue.add(new Tuple(id, revenue));
while (queue.size() > 5) {
queue.poll();
}
}
public void cleanup(Context context) throws IOException, InterruptedException {
while (!queue.isEmpty()) {
word.set(queue.poll().toString());
context.write(one, word);
}
}
}
public static class Top5CountReducer
extends Reducer<IntWritable,Text,Text,DoubleWritable> {
private final PriorityQueue<Tuple> queue = new PriorityQueue<>();
private final Text id = new Text();
private final DoubleWritable revenue = new DoubleWritable();
public void reduce(IntWritable key, Iterable<Text> values,
Context context
) throws IOException, InterruptedException {
for (Text val : values) {
StringTokenizer itr = new StringTokenizer(val.toString(), ",");
String id = itr.nextToken();
double revenue = Double.parseDouble(itr.nextToken());
queue.add(new Tuple(id, revenue));
}
while (queue.size() > 5) {
queue.poll();
}
while (!queue.isEmpty()) {
Tuple tuple = queue.poll();
id.set(tuple.id);
revenue.set(tuple.revenue);
context.write(id, revenue);
}
}
}
public int run(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "Task2_3");
job.setJarByClass(Top5Count.class);
job.setMapperClass(Top5Count.Top5CountMapper.class);
job.setReducerClass(Top5Count.Top5CountReducer.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);
List<String> other_args = new ArrayList<>();
for(int i = 0; i < args.length; ++i) {
try {
if ("-r".equals(args[i])) {
job.setNumReduceTasks(Integer.parseInt(args[++i]));
} else {
other_args.add(args[i]);
}
} catch (NumberFormatException except) {
System.out.println("ERROR: Integer expected instead of " + args[i]);
return printUsage();
} catch (ArrayIndexOutOfBoundsException except) {
System.out.println("ERROR: Required parameter missing from " +
args[i-1]);
return printUsage();
}
}
// Make sure there are exactly 2 parameters left.
if (other_args.size() != 2) {
System.out.println("ERROR: Wrong number of parameters: " +
other_args.size() + " instead of 2.");
return printUsage();
}
FileInputFormat.setInputPaths(job, other_args.get(0));
FileOutputFormat.setOutputPath(job, new Path(other_args.get(1)));
return (job.waitForCompletion(true) ? 0 : 1);
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new RevenueCount(), args);
System.exit(res);
}
}
解决方案
推荐阅读
- kdb - kdb+ 'if' 条件中的等于 (=) 和不等于 (<>) 具有时间值
- java - Spring Security - 具有 IS_AUTHENTICATED_ANONYMOUSLY 但仍指定(且未知)凭据的 URL 被框架阻止
- postgresql - Postgres:避免与其他条件重叠范围的最佳方法
- pact - 将 Pact 用于命令或查询而不是事件的异步消息时有任何问题吗?
- javascript - How to get an Array of all days between two days and show it in a list?
- java - “变量可能尚未初始化”(在 if 语句中)。我使用了 switch 语句,后跟 if。(JAVA)
- git - 在未来的项目中重用 gulpfile
- laravel - 导入时如何发送电子邮件?
- aws-lambda - 在 AWS 控制台 (web) 中设置 lambda 的处理程序
- sql - 使用具有不同行数的两个选择查询插入数据