关于什么是二次排序
在mapreduce操作时,shuffle阶段会多次根据key值排序。但是在shuffle分组后,相同key值的values序列的顺序是不确定的(如下图)。如果想要此时value值也是排序好的,这种需求就是二次排序。
默认情况下,map输出的结果会对key进行默认的排序,但是有时候需要对key排序的同时还需要对value进行排序,这时候就要用到二次排序了。
mapreduce二次排序分析
我们把二次排序分为以下几个阶段
map起始阶段
在map阶段,使用job.setinputformatclass()定义的inputformat,将输入的数据集分割成小数据块split,同时inputformat提供一个recordreader的实现。在这里我们使用的是textinputformat,它提供的recordreader会将文本的行号作为key,这一行的文本作为value。这就是自定 mapper的输入是《longwritable,text》 的原因。然后调用自定义mapper的map方法,将一个个《longwritable,text》键值对输入给mapper的map方法
map最后阶段
在map阶段的最后,会先调用job.setpartitionerclass()对这个mapper的输出结果进行分区,每个分区映射到一个reducer。每个分区内又调用job.setsortcomparatorclass()设置的key比较函数类排序。可以看到,这本身就是一个二次排序。如果没有通过job.setsortcomparatorclass()设置 key比较函数类,则使用key实现的compareto()方法
reduce阶段
在reduce阶段,reduce()方法接受所有映射到这个reduce的map输出后,也会调用job.setsortcomparatorclass()方法设置的key比较函数类,对所有数据进行排序。然后开始构造一个key对应的value迭代器。这时就要用到分组,使用 job.setgroupingcomparatorclass()方法设置分组函数类。只要这个比较器比较的两个key相同,它们就属于同一组,它们的 value放在一个value迭代器,而这个迭代器的key使用属于同一个组的所有key的第一个key。最后就是进入reducer的 reduce()方法,reduce()方法的输入是所有的key和它的value迭代器,同样注意输入与输出的类型必须与自定义的reducer中声明的一致。
、
#p##e#
接下来我们通过示例,可以很直观的了解二次排序的原理
输入文件 sort.txt 内容为
40 20
40 10
40 30
40 5
30 30
30 20
30 10
30 40
50 20
50 50
50 10
50 60
输出文件的内容(从小到大排序)如下
30 10
30 20
30 30
30 40
--------
40 5
40 10
40 20
40 30
--------
50 10
50 20
50 50
50 60
从输出的结果可以看出key实现了从小到大的排序,同时相同key的value也实现了从小到大的排序,这就是二次排序的结果
mapreduce二次排序的具体流程
在本例中要比较两次。先按照第一字段排序,然后再对第一字段相同的按照第二字段排序。根据这一点,我们可以构造一个复合类intpair ,它有两个字段,先利用分区对第一字段排序,再利用分区内的比较对第二字段排序。二次排序的流程分为以下几步。
1、自定义 key
所有自定义的key应该实现接口writablecomparable,因为它是可序列化的并且可比较的。writablecomparable 的内部方法如下所示
// 反序列化,从流中的二进制转换成intpairpublic void readfields(datainput in) throws ioexception
// 序列化,将intpair转化成使用流传送的二进制public void write(dataoutput out)
// key的比较public int compareto(intpair o)
// 默认的分区类 hashpartitioner,使用此方法public int hashcode()
// 默认实现public boolean equals(object right)
2、自定义分区
自定义分区函数类firstpartitioner,是key的第一次比较,完成对所有key的排序。
public static class firstpartitioner extends partitioner《 intpair,intwritable》
在job中使用setpartitionerclasss()方法设置partitioner
job.setpartitionerclasss(firstpartitioner.class);
3、key的比较类
这是key的第二次比较,对所有的key进行排序,即同时完成intpair中的first和second排序。该类是一个比较器,可以通过两种方式实现。
1) 继承writablecomparator。
public static class keycomparator extends writablecomparator
必须有一个构造函数,并且重载以下方法。
public int compare(writablecomparable w1, writablecomparable w2)
2) 实现接口 rawcomparator。
上面两种实现方式,在job中,可以通过setsortcomparatorclass()方法来设置key的比较类。
job.setsortcomparatorclass(keycomparator.class);
注意:如果没有使用自定义的sortcomparator类,则默认使用key中compareto()方法对key排序。
4、定义分组类函数
在reduce阶段,构造一个与 key 相对应的 value 迭代器的时候,只要first相同就属于同一个组,放在一个value迭代器。定义这个比较器,可以有两种方式。
1) 继承 writablecomparator。
public static class groupingcomparator extends writablecomparator
必须有一个构造函数,并且重载以下方法。
public int compare(writablecomparable w1, writablecomparable w2)
2) 实现接口 rawcomparator。
上面两种实现方式,在 job 中,可以通过 setgroupingcomparatorclass()方法来设置分组类。
job.setgroupingcomparatorclass(groupingcomparator.class);
另外注意的是,如果reduce的输入与输出不是同一种类型,则 combiner和reducer 不能共用 reducer 类,因为 combiner 的输出是 reduce 的输入。除非重新定义一个combiner。
3、代码实现
hadoop的example包中自带了一个mapreduce的二次排序算法,下面对 example包中的二次排序进行改进
package com.buaa;
import java.io.datainput;
import java.io.dataoutput;
import java.io.ioexception;
import org.apache.hadoop.io.writablecomparable;
/**
* @projectname secondarysort
* @packagename com.buaa
* @classname intpair
* @description 将示例数据中的key/value封装成一个整体作为key,同时实现 writablecomparable接口并重写其方法
* @author 刘吉超
* @date 2016-06-07 22:31:53
*/
public class intpair implements writablecomparable《intpair》{
private int first;
private int second;
public intpair(){
}
public intpair(int left, int right){
set(left, right);
}
public void set(int left, int right){
first = left;
second = right;
}
@override
public void readfields(datainput in) throws ioexception{
first = in.readint();
second = in.readint();
}
@override
public void write(dataoutput out) throws ioexception{
out.writeint(first);
out.writeint(second);
}
@override
public int compareto(intpair o)
{
if (first != o.first){
return first 《 o.first ? -1 : 1;
}else if (second != o.second){
return second 《 o.second ? -1 : 1;
}else{
return 0;
}
}
@override
public int hashcode(){
return first * 157 + second;
}
@override
public boolean equals(object right){
if (right == null)
return false;
if (this == right)
return true;
if (right instanceof intpair){
intpair r = (intpair) right;
return r.first == first && r.second == second;
}else{
return false;
}
}
public int getfirst(){
return first;
}
public int getsecond(){
return second;
}
}
package com.buaa;
import java.io.ioexception;import java.util.stringtokenizer;
import org.apache.hadoop.conf.configuration;import org.apache.hadoop.fs.filesystem;import org.apache.hadoop.fs.path;import org.apache.hadoop.io.intwritable;import org.apache.hadoop.io.longwritable;import org.apache.hadoop.io.text;import org.apache.hadoop.io.writablecomparable;import org.apache.hadoop.io.writablecomparator;import org.apache.hadoop.mapreduce.job;import org.apache.hadoop.mapreduce.mapper;import org.apache.hadoop.mapreduce.partitioner;import org.apache.hadoop.mapreduce.reducer;import org.apache.hadoop.mapreduce.lib.input.fileinputformat;import org.apache.hadoop.mapreduce.lib.input.textinputformat;import org.apache.hadoop.mapreduce.lib.output.fileoutputformat;import org.apache.hadoop.mapreduce.lib.output.textoutputformat;
/**
* @projectname secondarysort
* @packagename com.buaa
* @classname secondarysort
* @description todo
* @author 刘吉超
* @date 2016-06-07 22:40:37*/
@suppresswarnings(“deprecation”)public class secondarysort {
public static class map extends mapper《longwritable, text, intpair, intwritable》 {
public void map(longwritable key, text value, context context) throws ioexception, interruptedexception {
string line = value.tostring();
stringtokenizer tokenizer = new stringtokenizer(line);
int left = 0;
int right = 0;
if (tokenizer.hasmoretokens()) {
left = integer.parseint(tokenizer.nexttoken());
if (tokenizer.hasmoretokens())
right = integer.parseint(tokenizer.nexttoken());
context.write(new intpair(left, right), new intwritable(right));
}
}
}
/*
* 自定义分区函数类firstpartitioner,根据 intpair中的first实现分区
*/
public static class firstpartitioner extends partitioner《intpair, intwritable》{
@override
public int getpartition(intpair key, intwritable value,int numpartitions){
return math.abs(key.getfirst() * 127) % numpartitions;
}
}
/*
* 自定义groupingcomparator类,实现分区内的数据分组
*/
@suppresswarnings(“rawtypes”)
public static class groupingcomparator extends writablecomparator{
protected groupingcomparator(){
super(intpair.class, true);
}
@override
public int compare(writablecomparable w1, writablecomparable w2){
intpair ip1 = (intpair) w1;
intpair ip2 = (intpair) w2;
int l = ip1.getfirst();
int r = ip2.getfirst();
return l == r ? 0 : (l 《 r ? -1 : 1);
}
}
public static class reduce extends reducer《intpair, intwritable, text, intwritable》 {
public void reduce(intpair key, iterable《intwritable》 values, context context) throws ioexception, interruptedexception {
for (intwritable val : values) {
context.write(new text(integer.tostring(key.getfirst())), val);
}
}
}
public static void main(string[] args) throws ioexception, interruptedexception, classnotfoundexception {
// 读取配置文件
configuration conf = new configuration();
// 判断路径是否存在,如果存在,则删除
path mypath = new path(args[1]);
filesystem hdfs = mypath.getfilesystem(conf);
if (hdfs.isdirectory(mypath)) {
hdfs.delete(mypath, true);
}
job job = new job(conf, “secondarysort”);
// 设置主类
job.setjarbyclass(secondarysort.class);
// 输入路径
fileinputformat.setinputpaths(job, new path(args[0]));
// 输出路径
fileoutputformat.setoutputpath(job, new path(args[1]));
// mapper
job.setmapperclass(map.class);
// reducer
job.setreducerclass(reduce.class);
// 分区函数
job.setpartitionerclass(firstpartitioner.class);
// 本示例并没有自定义sortcomparator,而是使用intpair中compareto方法进行排序 job.setsortcomparatorclass();
// 分组函数
job.setgroupingcomparatorclass(groupingcomparator.class);
// map输出key类型
job.setmapoutputkeyclass(intpair.class);
// map输出value类型
job.setmapoutputvalueclass(intwritable.class);
// reduce输出key类型
job.setoutputkeyclass(text.class);
// reduce输出value类型
job.setoutputvalueclass(intwritable.class);
// 输入格式
job.setinputformatclass(textinputformat.class);
// 输出格式
job.setoutputformatclass(textoutputformat.class);
system.exit(job.waitforcompletion(true) ? 0 : 1);
}
}
浅谈云计算的概念是如何产生的?
小米互联网空调降价 延续了米家空调极简风格
NAND闪存售价下降,需求量猛增,SK海力士Q3创新高销售额达到477亿元
核心交换机、汇聚交换机与普通交换机的区别介绍
亚马逊要求送货司机签署“生物识别同意书”
mapreduce二次排序_ mapreduce二次排序原理
红外热像仪在电源模块行业中的应用
多次面临致命局面,英特尔能否涅槃重回芯片王者地位?
如何解决电机控制器EMC测试过程中的干扰问题
联发科预计第二季度营收556亿元新台币 增长12%至20%
苹果自研CPU,对英特尔可以说是利大于弊
车载摄像头总线(C2B)—经济高效的摄像头连接
氢燃料电池的发电效率可以达到50%以上
光线跟踪的原理和优缺点的概述
电子电路设计之变容二极管等效电路
ThinkPad使用和保护的“三大纪律八项注意”
人工智能时代,制造业面临的机遇与挑战
单相交流调压电路和三相交流调压电路讲解
数字式万用表使用方法
物联网还是“勿联网”?对物联网安全隐患的反思