首页 > 解决方案 > Hadoop MapReduce 无输出

问题描述

我正在编写一个 hadoop MapReduce 程序并在 AWS EMR 集群上运行它。但是我无法获得预期的输出,它表明在 Mapper 阶段根本没有输出。

我的程序倾向于从另一个包含 Text 和 DoubleWritable 的 M/R 作业接收文件。这就像一个名称/收入对。并且 Mapper 应该得到前 5 名收入的 5 条记录。Reducer 应该从 8 个映射器中获得 40 条记录,并获得最终的 top5 名称/收入对。

Mapper 的输入文件是来自 hadoop M/R 作业的 (Text,DoubleWritable)。我的程序的输出也应该是 (Text,DoubleWritable)

这是我的源代码。

public class Top5Count extends Configured implements Tool{
    
    static int printUsage() {
        System.out.println("wordcount [-m <maps>] [-r <reduces>] <input> <output>");
        ToolRunner.printGenericCommandUsage(System.out);
        return -1;
    }

    public static class Tuple implements Comparable<Tuple>{
        String id;
        double revenue;

        public Tuple (String id, double revenue) {
            this.id = id;
            this.revenue = revenue;
        }
        public String toString() {
            return id + "," + revenue;
        }


        @Override
        public int compareTo(Tuple o) {
            if (this.revenue > o.revenue)
                return 1;
            else if (this.revenue == o.revenue)
                return 0;
            else
                return -1;
        }
    }

    public static class Top5CountMapper
            extends Mapper<Object, Text, IntWritable, Text> {

        private final Text word = new Text();
        private final IntWritable one = new IntWritable(1);
        private final PriorityQueue<Tuple> queue = new PriorityQueue<>();


        public void map(Object key, Text value, Context context
        ) throws IOException, InterruptedException {
            StringTokenizer itr = new StringTokenizer(value.toString(), "\t");
            String id = itr.nextToken();
            double revenue = Double.parseDouble(itr.nextToken());
            queue.add(new Tuple(id, revenue));
            while (queue.size() > 5) {
                queue.poll();
            }
        }

        public void cleanup(Context context) throws IOException, InterruptedException {
            while (!queue.isEmpty()) {
                word.set(queue.poll().toString());
                context.write(one, word);
            }
        }
    }

    public static class Top5CountReducer
            extends Reducer<IntWritable,Text,Text,DoubleWritable> {

        private final PriorityQueue<Tuple> queue = new PriorityQueue<>();
        private final Text id = new Text();
        private final DoubleWritable revenue = new DoubleWritable();
        public void reduce(IntWritable key, Iterable<Text> values,
                           Context context
        ) throws IOException, InterruptedException {
            for (Text val : values) {
                StringTokenizer itr = new StringTokenizer(val.toString(), ",");
                String id = itr.nextToken();
                double revenue = Double.parseDouble(itr.nextToken());
                queue.add(new Tuple(id, revenue));
            }
            while (queue.size() > 5) {
                queue.poll();
            }
            while (!queue.isEmpty()) {
                Tuple tuple = queue.poll();
                id.set(tuple.id);
                revenue.set(tuple.revenue);
                context.write(id, revenue);
            }
        }
    }


    public int run(String[] args) throws Exception {

        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "Task2_3");
        job.setJarByClass(Top5Count.class);
        job.setMapperClass(Top5Count.Top5CountMapper.class);
        job.setReducerClass(Top5Count.Top5CountReducer.class);
        job.setMapOutputKeyClass(IntWritable.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(DoubleWritable.class);

        List<String> other_args = new ArrayList<>();
        for(int i = 0; i < args.length; ++i) {
            try {
                if ("-r".equals(args[i])) {
                    job.setNumReduceTasks(Integer.parseInt(args[++i]));
                } else {
                    other_args.add(args[i]);
                }
            } catch (NumberFormatException except) {
                System.out.println("ERROR: Integer expected instead of " + args[i]);
                return printUsage();
            } catch (ArrayIndexOutOfBoundsException except) {
                System.out.println("ERROR: Required parameter missing from " +
                        args[i-1]);
                return printUsage();
            }
        }
        // Make sure there are exactly 2 parameters left.
        if (other_args.size() != 2) {
            System.out.println("ERROR: Wrong number of parameters: " +
                    other_args.size() + " instead of 2.");
            return printUsage();
        }
        FileInputFormat.setInputPaths(job, other_args.get(0));
        FileOutputFormat.setOutputPath(job, new Path(other_args.get(1)));
        return (job.waitForCompletion(true) ? 0 : 1);
    }

    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new RevenueCount(), args);
        System.exit(res);
    }
}

标签: javaamazon-web-serviceshadoopmapreduce

解决方案


推荐阅读