首页 > 技术文章 > hadoop的二次排序

52hadoop 2015-02-11 16:25 原文

package  com.qq;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public class InfoBean implements WritableComparable<InfoBean>{

    private String account;
    
    private double income;
    
    private double expenses;
    
    private double surplus;
    
    public void set(String account, double income, double expenses){
        this.account = account;
        this.income = income;
        this.expenses = expenses;
        this.surplus = income - expenses;
    }
    
    @Override
    public String toString() {
        return this.income + "\t" + this.expenses + "\t" + this.surplus;
    }

    /**
     * serialize
     */
    public void write(DataOutput out) throws IOException {
        out.writeUTF(account);
        out.writeDouble(income);
        out.writeDouble(expenses);
        out.writeDouble(surplus);
    }

    /**
     * 
     */
    public void readFields(DataInput in) throws IOException {
        this.account = in.readUTF();
        this.income = in.readDouble();
        this.expenses = in.readDouble();
        this.surplus = in.readDouble();
    }
    

    public int compareTo(InfoBean o) {
        if(this.income == o.getIncome()){
            return this.expenses > o.getExpenses() ? 1 : -1; //由低到高排序
        } else {
            return this.income > o.getIncome() ? -1 : 1;  //由高到低排序
        }
    }

    public String getAccount() {
        return account;
    }

    public void setAccount(String account) {
        this.account = account;
    }

    public double getIncome() {
        return income;
    }

    public void setIncome(double income) {
        this.income = income;
    }

    public double getExpenses() {
        return expenses;
    }

    public void setExpenses(double expenses) {
        this.expenses = expenses;
    }

    public double getSurplus() {
        return surplus;
    }

    public void setSurplus(double surplus) {
        this.surplus = surplus;
    }

    
}




package com.qq;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class SortStep {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        job.setJarByClass(SortStep.class);
        job.setMapperClass(SortMapper.class);
        job.setMapOutputKeyClass(InfoBean.class);
        job.setMapOutputValueClass(NullWritable.class);
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        job.setReducerClass(SortReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(InfoBean.class);
        job.setNumReduceTasks(1);
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        job.waitForCompletion(true);

    }

    public static class SortMapper extends Mapper<LongWritable, Text, InfoBean, NullWritable>{

        private InfoBean bean = new InfoBean();
        
        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            String line = value.toString();
            String[] fields = line.split(",");
            String account = fields[0];
            double income = Double.parseDouble(fields[1]);        //收入从高到低排序
            double expenses = Double.parseDouble(fields[2]);      //支出从低到高排序
            bean.set(account, income, expenses);
            context.write(bean, NullWritable.get());
        }
        
    }
    
    
    public static class SortReducer extends Reducer<InfoBean, NullWritable, Text, InfoBean>{
        private Text k = new Text();
        @Override
        protected void reduce(InfoBean bean, Iterable<NullWritable> v2s, Context context)
                throws IOException, InterruptedException {
            String account = bean.getAccount();
            k.set(account);
            context.write(k, bean);
        }
        
    }
}

数据文件格式  

sss,300,50
xxx,300,100
ggg,5000,30
hhh,1000,50
kkk,1000,20
aaa,2000,700
bbb,500,30

我们想对第二列降序排列,当第二列相同的时候,对第三列升序排列

ggg     5000.0  30.0    4970.0
aaa     2000.0  700.0   1300.0
kkk     1000.0  20.0    980.0
hhh     1000.0  50.0    950.0
bbb     500.0   30.0    470.0
sss     300.0   50.0    250.0
xxx     300.0   100.0   200.0

 

推荐阅读