首页 > 技术文章 > Hadoop(15)Mapreduce编程实现案例2(手机号)

jimmy888 2020-08-25 23:55 原文

Mapreduce编程实现案例2(上下行)

WritableHadoop的序列化格式,hadoop定义了这样一个Writable接口。 一个类要支持可序列化只需实现这个接口即可。

另外Writable有一个子接口是WritableComparablewritableComparable是既可实现序列化,也可以对key进行比较,我们这里可以通过自定义key实现WritableComparable来实现我们的排序功能。

在企业开发中往往常用的基本序列化类型不能满足所有需求,比如在Hadoop框架内部传递一个java bean对象,那么该对象就需要实现序列化接口。

创建可序列化的java bean类的步骤:

  1. 创建一个类,必须实现Writable接口
  2. 重写void write(DataOutput out)方法,重写void readFields(DataInput in)方法
  3. 重写String toString()方法
  4. 为该类创建无参构造,反序列化时,反射需要调用空参构造函数

案例需求

现有数据,内容如下,求取每个手机号的上行包之和,下行包之和,上行总流量之和,以及下行总流量之和。数据以制表符\t为分分隔符,第二列为手机号。

image-20200212001352384

案例分析

image-20200212023337955

步骤1:创建Maven工程

wordcount案例1相同

步骤2:定义javaBean类型的可序列化类

将上下行包和上下行流量这四个值包装在这个类里,该类要实现Writable接口,表示可序列化的。

package com.bean;

import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class FlowBean implements Writable {
    private Integer upFlow; //上行包
    private Integer downFlow; //下行包
    private Integer upCountFlow; //上行流量总和
    private Integer downCountFlow; //下行流量总和

    //建立空的构造方法,反序列化时会用到
    public FlowBean() {

    }

    //序列化方法:
    @Override
    public void write(DataOutput out) throws IOException {
        //序列化的类型要跟源数据的数据类型一致,上下行包和流量总和的值都是int类型。
        out.writeInt(upFlow);
        out.writeInt(downFlow);
        out.writeInt(upCountFlow);
        out.writeInt(downCountFlow);
    }

    //反序列化方法:
    @Override
    public void readFields(DataInput in) throws IOException {
        //反序列化的类型也要跟源数据的数据类型一致
        //反序列化的顺序跟序列化的顺序必须完全一致!!!
        // 顺序是:upFlow->downFlow->upCountFlow->downCountFlow
        this.upFlow = in.readInt();
        this.downFlow = in.readInt();
        this.upCountFlow = in.readInt();
        this.downCountFlow = in.readInt();
    }

    //getter与setter
    public Integer getUpFlow() {
        return upFlow;
    }

    public void setUpFlow(Integer upFlow) {
        this.upFlow = upFlow;
    }

    public Integer getDownFlow() {
        return downFlow;
    }

    public void setDownFlow(Integer downFlow) {
        this.downFlow = downFlow;
    }

    public Integer getUpCountFlow() {
        return upCountFlow;
    }

    public void setUpCountFlow(Integer upCountFlow) {
        this.upCountFlow = upCountFlow;
    }

    public Integer getDownCountFlow() {
        return downCountFlow;
    }

    public void setDownCountFlow(Integer downCountFlow) {
        this.downCountFlow = downCountFlow;
    }


    @Override
    public String toString() {
        return "FlowBean{" +
                "upFlow=" + upFlow +
                ", downFlow=" + downFlow +
                ", upCountFlow=" + upCountFlow +
                ", downCountFlow=" + downCountFlow +
                '}';
    }
}

步骤3:自定义map逻辑

package com.bean;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;

//输入:LongWritable,Text
//输出:Text,FlowBean
//特别留意到,输出的键值对的值是我们自己定义的可序列类型FlowBean,里面包装了4个数据
public class MyMap extends Mapper<LongWritable,Text,Text,FlowBean> {
    private FlowBean flowBean ;
    private Text text;

    //重写setup方法,初始化的动作写在该方法里,一个map task运行前会执行一次该方法
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        flowBean = new FlowBean();
        text = new Text();
    }

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //切分一行的数据得到一个数组
        String[] split = value.toString().split("\t");
        //获取手机号以及那四个值:
        String phoneNum = split[1];
        String upFlow =split[6];
        String downFlow =split[7];
        String upCountFlow =split[8];
        String downCountFlow =split[9];

        //设置输出键值对的key的内容:
        text.set(phoneNum);
        //设置输出键值对的value内容:
        flowBean.setUpFlow(Integer.parseInt(upFlow));
        flowBean.setDownFlow(Integer.parseInt(downFlow));
        flowBean.setUpCountFlow(Integer.parseInt(upCountFlow));
        flowBean.setDownCountFlow(Integer.parseInt(downCountFlow));
        //承上启下,输出键值对到shuffle阶段:
        context.write(text,flowBean);
    }
}

步骤4:自定义reduce逻辑

package com.bean;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;

public class MyReduce extends Reducer<Text,FlowBean,Text,Text> {

    @Override
    protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {
        int upFlow = 0;
        int donwFlow = 0;
        int upCountFlow = 0;
        int downCountFlow = 0;
        for (FlowBean value : values) {
            upFlow += value.getUpFlow();
            donwFlow += value.getDownFlow();
            upCountFlow += value.getUpCountFlow();
            downCountFlow += value.getDownCountFlow();
        }
        context.write(key,new Text(upFlow +"\t" +  donwFlow + "\t" +  upCountFlow + "\t" + downCountFlow));
    }
}

步骤5:创建组装类以及定义main方法

package com.bean;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class Assem  extends Configured implements Tool {
    @Override
    public int run(String[] args) throws Exception {
        //获取job对象
        Job job = Job.getInstance(super.getConf(), "flowCount");
        //如果程序打包运行必须要设置这一句
        job.setJarByClass(Assem.class);


        job.setInputFormatClass(TextInputFormat.class);
        TextInputFormat.addInputPath(job,new Path("file:///..."));
        //TextInputFormat.addInputPath(job,new Path(args[0]));使用这个方式可以传入参数

        job.setMapperClass(MyMap.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);

        job.setReducerClass(MyReduce.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        job.setOutputFormatClass(TextOutputFormat.class);
        TextOutputFormat.setOutputPath(job,new Path("file:///..."));
        //TextOutputFormat.setOutputPath(job,new Path(args[1]));使用这个方式可以传入参数

        boolean b = job.waitForCompletion(true);
        return b?0:1;
    }
	//程序入口:
    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();
        int run = ToolRunner.run(configuration, new Assem(), args);
        System.exit(run);
    }
}

推荐阅读