首页 > 技术文章 > Hadoop(16)输入类

jimmy888 2020-08-25 23:57 原文

Inputformat类

InputFormatmapreduce当中用于处理数据输入的一个组件,是最顶级的一个抽象父类,主要用于解决各个地方的数据源的数据输入问题。其中InputFormatUML类图可以通过idea进行查看(只有商业版本才有这个功能),如下图。

image-20200212171237897

查看类图的方式:进行要查看的类的源码,点击菜单栏:Navigate---》type Hierarchy--》

image-20200212171811141

FileInputformat类

FileInputFormat类是InputFormat的一个子类,如果需要操作hdfs上面的文件,基本上都是通过FileInputFormat类来实现的,我们可以通过FileInputFormat来实现各种格式的文件操作,FileInputFormat的子实现类的UML类图如下:

![image-20200212173217679](https://img2020.cnblogs.com/blog/1362875/202008/1362875-20200826234955425-1101383843.png)                           

FileInputFormat的子类:

类名 主要作用
TextInputFormat 读取文本文件
CombineFileInputFormat MR当中用于合并小文件,将多个小文件合并之后只需要启动一个mapTask进行运行
SequenceFileInputFormat 处理SequenceFile这种格式的数据
KeyValueTextInputFormat 通过手动指定分隔符,将每一条数据解析成为key,value对类型
NLineInputFormat 指定数据的行数作为一个切片
FixedLengthInputFormat 从文件中读取固定宽度的二进制记录

CombineTextInputFormat类

MapTask的个数由什么决定?--->切片

在运行我们的MapReduce程序的时候,我们可以清晰的看到会有多个mapTask的运行,那么maptask的个数究竟与什么有关,是不是maptask越多越好,或者说是不是maptask的个数越少越好呢?我们可以通过MapReduce的源码进行查看mapTask的个数究竟是如何决定的

MapReduce当中,每个mapTask处理一个切片split的数据量,因此,一个切片对应一个maptask

切片是什么?

切片与block块的概念很像,但是block块是HDFS当中存储数据的单位,切片splitMapReduce当中每个MapTask处理数据量的单位。

  • 一个切片对应一个maptask

  • 一个job(事务)的map阶段的并行度有提交job时的切片数

  • 切片默认大小=BlockSize

  • 切片不考虑数据集整体,而是针对每一个文件单独切片

  • 数据切片只是在逻辑上对输入进行分片,并不会在磁盘上将其切分成片进行存储

案例:

hdfs上面如果有以下两个文件,文件大小分别为300M12M,那么会启动多少个MapTask???

file1.txt  300M

file2.txt  10M

经过FileInputFormat的切片机制运算后,形成的切片信息如下:

file1.txt.split1-- 0~128M
file1.txt.split2-- 128~256M
file1.txt.split3-- 256~300M
file2.txt.split1-- 0~10M  //针对每一个文件单独切片,这个文件很小,但是会对应一个切片,一个maptask
//一共就会有四个切片,与我们block块的个数刚好相等

图解如下:

image-20200213022841631

从上图可以看到,当切片大小为100M时,这个Maptask2,不仅从本节点datanode2读取数据,还要从datanode2读取数据,意味着要进行网络传输,所以maptask所在节点一般最好跟所需的数据块的节点一致,合适的切片大小很关键

切片大小的设定

查看FileInputFormat的源码,点击Navigate--->File Structure,找到里面getSplits的方法,这个方法是获取所有的切片。

public List<InputSplit> getSplits(JobContext job) throws IOException {
        long minSize = Math.max(this.getFormatMinSplitSize(), getMinSplitSize(job));
        long maxSize = getMaxSplitSize(job);
        List<InputSplit> splits = new ArrayList();
        ...
    	...
}

再找到computeSplitSize(),这个是切片大小的计算公式所在的方法:

protected long computeSplitSize(long blockSize, long minSize, long maxSize) {
    return Math.max(minSize, Math.min(maxSize, blockSize));
    //Math.max(minSize, Math.min(maxSize, blockSize))这个就是切片计算公式
}
/*
计算公式中的minsize和maxsize分别是:
mapreduce.input.fileinputformat.split.minsize=1 //默认值为1 
mapreduce.input.fileinputformat.split.maxsize= Long.MAXValue //默认值Long.MAXValue(long类型的最大值) 
blockSize默认值为128M  
*/

image-20200213031047043

切片默认=blocksize128M,从切片计算公式可以知道,如果想要切片变成小于128M的就要将maxsize设置为小于128M的值,如果想要切片变成大于128M的就要将minsize设置为大于128M的值。

//改变minsize大小的方法的源码:
public static void setMinInputSplitSize(Job job, long size) {
    job.getConfiguration().setLong("mapred.min.split.size", size);
}
//改变maxsize大小的方法的源码:
public static void setMaxInputSplitSize(Job job, long size) {
    job.getConfiguration().setLong("mapred.max.split.size", size);
}

如何控制mapTask的个数

从上面知,一个很小的文件也会对应一个切片,从而占用一个MapTask。那如果有1000个小文件,每个小文件是1kb-100MB之间,那么我们启动1000个MapTask是否合适,该如何合理的控制MapTask的个数???

如果需要控制maptask的个数,我们只需要调整maxSizeminsize这两个值,那么切片的大小就会改变,切片大小改变之后,mapTask的个数就会改变。

但是,这种做法不能有效解决问题,最好将小文件进行合并规划处理,从而减少切片个数,减少maptask个数。

案例:实现maptask个数控制(小文件合并)

框架默认的TextInputFormat切片机制是对任务按文件规划切片,不管文件多小,都会是一个单独的切片,都会交给一个MapTask,这样如果有大量小文件,就会产生大量的MapTask,处理效率极其低下。

CombineTextInputFormat用于小文件过多的场景,它可以将多个小文件从逻辑上规划到一个切片中,这样,多个小文件就可以交给一个MapTask处理。

案例需求

有很多0-20M的小文件,要提交给集群计算处理,要保证maptask个数不要过多。

案例分析

有太多小文件了,要通过CombineTextInputFormat将多个小文件从逻辑上规划到一个切片中。

首先要修改最大值MaxInputSplitSize,设置如下:

CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m

注意:这个虚拟存储切片最大值最好根据实际的小文件大小情况来设置具体的值。详情应用看下面:

CombineTextInputFormat将多个小文件从逻辑上规划到一个切片中的详细过程(机制):

包括两个过程:

虚拟存储过程:

将输入目录下所有文件大小,依次和设置的setMaxInputSplitSize值比较。

  • 如果输入文件大小不大于设置的最大值,逻辑上划分一个块。

  • 如果输入文件大于设置的最大值且大于两倍,那么以最大值切割一块,剩下的平分为两块;

  • 如果输入文件超过设置的最大值且不大于最大值2倍,此时将文件均分成2个虚拟存储块(平均分,防止出现太小切片)。

  • 例如,如果setMaxInputSplitSize的值为4M,输入文件大小为8.02M,则先逻辑上分成一个4M。剩余的大小为4.02M,如果按照4M逻辑划分,就会出现0.02M的小的虚拟存储文件,所以将剩余的4.02M文件切分成(2.01M2.01M)两个文件。

    切片过程:

    判断虚拟存储的文件大小是否大于setMaxInputSplitSize值,大于等于则单独形成一个切片。

    如果不大于则跟下一个虚拟存储文件进行合并,共同形成一个切片。

    示意图如下

image-20200213042641264

案例实现

把前面的wordCount实战案例中代码拉过来,修改组装类的第一步的代码即可。

// job.setInputFormatClass(TextInputFormat.class);

job.setInputFormatClass(CombineTextInputFormat.class);
//设置我们的输入类型为CombineTextInputFormat

CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);
//虚拟存储切片最大值设置4m,设置每个切片处理数据量为4M

打包运行,观察mapTask的个数

KeyValueTextInputFormat类

KeyValueTextInputFormat允许我们自己来定义分隔符,通过分隔符来自定义我们输入的keyvalue

案例需求

附件当中有下列这类型的数据,数据之间的分隔符为@zolen@ 数据内容如下

hello@zolen@ input datas today 
count@zolen@ hadoop spark
hello@zolen@ input some datas to test

要求将分隔符前面的内容作为输入键值对key,后面的作为输入键值对的value

期望最终的输出结果如下

hello  2
count  1

案例分析

输入给map阶段的键值对可序列化类型为:<Text,Text>

map阶段输出的键值对的可序列化类型为:<Text,LongWritable>

image-20200213053831014

步骤1:自定义map逻辑

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;

public class KeyValueMapper extends Mapper<Text,Text,Text, LongWritable> {
    LongWritable outValue = new LongWritable(1);
    @Override
    protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
        context.write(key,outValue);

    }
}

步骤2:自定义reduce逻辑

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;

public class KeyValueReducer extends Reducer<Text, LongWritable,Text,LongWritable> {


    @Override
    protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
        long result = 0;
        for (LongWritable value : values) {
            long l = value.get();
            result += l;
        }

        context.write(key,new LongWritable(result));

    }
}

步骤3:创建组装类和定义main方法

package com.jimmy.day02;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.KeyValueLineRecordReader;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import java.io.IOException;

public class KeyValueMain  {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();
        //翻阅 KeyValueLineRecordReader 的源码,发现切割参数的配置
        conf.set("key.value.separator.in.input.line","@zolen@");
        Job job = Job.getInstance(conf);
        //第一步:读取文件,解析成为key,value对
        KeyValueTextInputFormat.addInputPath(job,new Path("file://F://test2"));

        job.setInputFormatClass(KeyValueTextInputFormat.class);
        //第二步:设置mapper类
        job.setMapperClass(KeyValueMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);

        //第三步到第六步 分区,排序,规约,分组

        //第七步:设置reducer类
        job.setReducerClass(KeyValueReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);

        //第八步:输出数据
        job.setOutputFormatClass(TextOutputFormat.class);
        TextOutputFormat.setOutputPath(job,new Path("file:///..."));
        boolean b = job.waitForCompletion(true);
        System.exit(0);

    }

}

NlineInputFormat类

NLineInputFormat允许我们自己定义输入特定行数的内容作为一个切片数据。比如说,某文件有下列17行数据,如果我们定义3行内容作为一个切片数据,就会有6个切片,对应有6maptask,可打包到集群运行查看maptask个数。

hadoop spark kaikeba bigdata flink kafka mapreduce
hello world
hello flume
hadoop hive
hive kafka
flume storm
hive oozie
hello hello
world world
hadoop hadoop
hello world
hello flume
hadoop hive
hive kafka
flume storm
hive ooziehello hello
world world

案例需求

将上面内容做词频统计,每输入3行内容作为一个切片数据。

案例分析

wordcount案例没多大区别,只要稍微修改一下组装逻辑的代码即可。

案例代码

mapreduce、组装逻辑都整合在了一个类里,有内部类。

package com.jimmy.day03;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import java.io.IOException;

public class NLineMain {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration configuration = new Configuration();
		
        Job job = Job.getInstance(configuration);
        job.setJarByClass(NLineMain.class);
		//设定输入类,设定3行为一个切片数据量
        NLineInputFormat.setNumLinesPerSplit(job,3);
        job.setInputFormatClass(NLineInputFormat.class);
        NLineInputFormat.addInputPath(job,new Path("file:///..."));

        //第二步:自定义mapper类
        job.setMapperClass(NLineMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);

        //第三步到第六步  分区,排序,规约,分组
        job.setReducerClass(NLineReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        TextOutputFormat.setOutputPath(job,new Path("file:///..."));
        job.waitForCompletion(true);

    }

    public static class NLineMapper extends Mapper<LongWritable, Text,Text,LongWritable>{
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String[] s = value.toString().split(" ");
            for (String s1 : s) {
                context.write(new Text(s1),new LongWritable(1));

            }
        }
    }

    public static class NLineReducer extends Reducer<Text,LongWritable,Text,LongWritable>{
        @Override
        protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
            long result = 0L;
            for (LongWritable value : values) {
                result +=value.get();
            }
            context.write(key,new LongWritable(result));
        }
    }
}

打包到集群上运行时,从运行的输出信息可以看到,有6个maptask:

image-20200213170111238

推荐阅读