首页 > 技术文章 > mapreduce流程

hatcher-h 2020-05-28 22:00 原文

mapTask运行机制

mapTask并行度:同时存在几个mapTask
TextInputFormat中的getSplits方法返回的是切片数目,有多少切片就有几个mapTask。

获取文件的切片的几个参数控制:
mapred.min.split.size  没有配置的话默认值是1

mapred.max.split.size  没有配置的话默认值是  Long.MAX_VALUE

如果没有配置上面这两个参数,我们文件的切片大小就是128M,与我们的block块相等
正常一个block块对应一个mapTask

mapTask流程

在这里插入图片描述

1、inputFormat读取文件数据,并解析为相对应的键值对k1,v1
2、自定义map程序,解析为我们想要的k2,v2
3、在写入环形缓冲区前会进入分区阶段,每个数据指定将要进入哪一个reduce
4、写入环形缓冲区。环形缓冲区默认大小为100M,当写入的数据到达80M的时候,会启动溢写。
5、在溢写的时候会对溢写数据进行排序,然后再写入磁盘的临时文件中。
6、当该mapTask数据写入完成后,所有临时文件会合并成一个大文件

mapTask基础设置配置

设置一:设置环型缓冲区的内存值大小(默认设置如下)
mapreduce.task.io.sort.mb 100

设置二:设置溢写百分比(默认设置如下)
mapreduce.map.sort.spill.percent 0.80

设置三:设置溢写数据目录(默认设置)
mapreduce.cluster.local.dir ${hadoop.tmp.dir}/mapred/local

设置四:设置一次最多合并多少个溢写文件(默认设置如下)
mapreduce.task.io.sort.factor 10

reduceTask机制

reduceTask流程

1、copy ,reduce开启线程拉取属于自己的数据
2、合并 ,将数据进行合并也就是分组
3、排序 reduce局部排序,每个reduce间不干扰
reduce的copy也是有个缓冲区,缓冲区类似与mapTask,缓冲区有限定值,满了以后存入磁盘

mapReduce全流程

在这里插入图片描述

Snappy压缩

可以配置文件中设置也可以代码中设置

reduce端join算法

reduce太少:每个reduce压力太大
reduce太多:资源浪费

map端join算法

去掉reduce
采用缓存
package cn.nina.mr.demo6;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;

public class MapJoinMapper extends Mapper<LongWritable, Text,Text, NullWritable> {
    //在程序主类运行方法里加了缓存文件,这里可以获取缓存文件
    Map<String,String> map = null;
    //重写setup方法获取缓存文件,将缓存文件内容存储到map当中去
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        map = new HashMap<String,String>();
        //从context中获取configuration
        Configuration configuration = context.getConfiguration();
        //只有一个缓存文件,所以可以直接取第一个
        URI[] cacheFiles = DistributedCache.getCacheFiles(configuration);
        URI cacheFile = cacheFiles[0];

        //获取文件系统
        FileSystem fileSystem = FileSystem.get(cacheFile, configuration);
        //获取文件输入流,如何将流转换成字符串
        FSDataInputStream open = fileSystem.open(new Path(cacheFile));
        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(open));
        String line = null;
        while ((line = bufferedReader.readLine()) != null){
            String[] lineArray = line.split(",");
            map.put(lineArray[0],line);
        }

    }

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String[] split = value.toString().split(",");
        //获取到商品表数据
        String product = map.get(split[2]);
        //将商品表和订单表进行拼接
        context.write(new Text(value.toString()+"\t"+product),NullWritable.get());
    }
}

推荐阅读