分区
相同key的value去同一个reduce
分区后需要指定reduceTask个数
package pation;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class PationOwn extends Partitioner<Text, NullWritable> {
@Override
public int getPartition(Text text, NullWritable nullWritable, int i) {
String[] split = text.toString().split("\t");
if (Integer.parseInt(split[5]) > 15) {
return 0;
} else {
return 1;
}
}
}
排序
序列化、反序列化
hadoop当中没有沿用java序列化serialize方式,使用的是writable接口,实现了writable接口就可以序列化
如果需要序列化,需要实现 writable接口
如果需要排序 需要实现 comparable接口
如果既需要序列化,也需要排序 可以实现 writable和comparable 或者 WritableComparable
package cn.nina.mr.demo2;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class K2Bean implements WritableComparable<K2Bean> {
private String first;
private int second;
//比较排序
@Override
public int compareTo(K2Bean o) {
//首先比较第一个字段,相同再比较第二个字段
int i = this.first.compareTo(o.first);
if (i == 0){
int i1 = Integer.valueOf(this.second).compareTo(Integer.valueOf(o.second));
return i1;
}else{
//直接将比较结果返回
return i;
}
}
//序列化
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(first);
out.writeInt(second);
}
//反序列化
@Override
public void readFields(DataInput in) throws IOException {
this.first = in.readUTF();
this.second = in.readInt();
}
public String getFirst() {
return first;
}
public void setFirst(String first) {
this.first = first;
}
public int getSecond() {
return second;
}
public void setSecond(int second) {
this.second = second;
}
@Override
public String toString() {
return first + '\t' + second;
}
}
规约
mapreduce当中的规约的过程
规约combiner:是发生在map端的一个小型的reduce,接收我们的k2 v2 输出k2 v2
combiner不能够改变我们的数据的结果值,只是用于调优,减少发送到reduce端的数据量.
计数器
Counter counter = context.getCounter("MR_COUNT", "MapRecordCounter");
counter.increment(1L);