hadoop二次排序的原理和实现方法
作者:白马不是马 发布时间:2023-01-16 22:42:31
默认情况下,Map输出的结果会对Key进行默认的排序,但是有时候需要对Key排序的同时还需要对Value进行排序,这时候就要用到二次排序了。下面我们来说说二次排序
1、二次排序原理
我们把二次排序分为以下几个阶段
Map起始阶段
在Map阶段,使用job.setInputFormatClass()定义的InputFormat,将输入的数据集分割成小数据块split,同时InputFormat提供一个RecordReader的实现。在这里我们使用的是TextInputFormat,它提供的RecordReader会将文本的行号作为Key,这一行的文本作为Value。这就是自定 Mapper的输入是<LongWritable,Text> 的原因。然后调用自定义Mapper的map方法,将一个个<LongWritable,Text>键值对输入给Mapper的map方法
Map最后阶段
在Map阶段的最后,会先调用job.setPartitionerClass()对这个Mapper的输出结果进行分区,每个分区映射到一个Reducer。每个分区内又调用job.setSortComparatorClass()设置的Key比较函数类排序。可以看到,这本身就是一个二次排序。如果没有通过job.setSortComparatorClass()设置 Key比较函数类,则使用Key实现的compareTo()方法
Reduce阶段
在Reduce阶段,reduce()方法接受所有映射到这个Reduce的map输出后,也会调用job.setSortComparatorClass()方法设置的Key比较函数类,对所有数据进行排序。然后开始构造一个Key对应的Value迭代器。这时就要用到分组,使用 job.setGroupingComparatorClass()方法设置分组函数类。只要这个比较器比较的两个Key相同,它们就属于同一组,它们的 Value放在一个Value迭代器,而这个迭代器的Key使用属于同一个组的所有Key的第一个Key。最后就是进入Reducer的 reduce()方法,reduce()方法的输入是所有的Key和它的Value迭代器,同样注意输入与输出的类型必须与自定义的Reducer中声明的一致
接下来我们通过示例,可以很直观的了解二次排序的原理
输入文件 sort.txt 内容为
40 20 40 10 40 30 40 5 30 30 30 20 30 10 30 40 50 20 50 50 50 10 50 60
输出文件的内容(从小到大排序)如下
30 10 30 20 30 30 30 40 -------- 40 5 40 10 40 20 40 30 -------- 50 10 50 20 50 50 50 60
从输出的结果可以看出Key实现了从小到大的排序,同时相同Key的Value也实现了从小到大的排序,这就是二次排序的结果
2、二次排序的具体流程
在本例中要比较两次。先按照第一字段排序,然后再对第一字段相同的按照第二字段排序。根据这一点,我们可以构造一个复合类IntPair ,它有两个字段,先利用分区对第一字段排序,再利用分区内的比较对第二字段排序。二次排序的流程分为以下几步。
在本例中要比较两次。先按照第一字段排序,然后再对第一字段相同的按照第二字段排序。根据这一点,我们可以构造一个复合类IntPair ,它有两个字段,先利用分区对第一字段排序,再利用分区内的比较对第二字段排序。二次排序的流程分为以下几步。
1、自定义 key
所有自定义的key应该实现接口WritableComparable,因为它是可序列化的并且可比较的。WritableComparable 的内部方法如下所示
// 反序列化,从流中的二进制转换成IntPair
public void readFields(DataInput in) throws IOException
// 序列化,将IntPair转化成使用流传送的二进制
public void write(DataOutput out)
// key的比较
public int compareTo(IntPair o)
// 默认的分区类 HashPartitioner,使用此方法
public int hashCode()
// 默认实现
public boolean equals(Object right)
2、自定义分区
自定义分区函数类FirstPartitioner,是key的第一次比较,完成对所有key的排序。
public static class FirstPartitioner extends Partitioner< IntPair,IntWritable>
在job中使用setPartitionerClasss()方法设置Partitioner
job.setPartitionerClasss(FirstPartitioner.Class);
3、Key的比较类
这是Key的第二次比较,对所有的Key进行排序,即同时完成IntPair中的first和second排序。该类是一个比较器,可以通过两种方式实现。
1) 继承WritableComparator。
public static class KeyComparator extends WritableComparator
必须有一个构造函数,并且重载以下方法。
public int compare(WritableComparable w1, WritableComparable w2)
2) 实现接口 RawComparator。
上面两种实现方式,在Job中,可以通过setSortComparatorClass()方法来设置Key的比较类。
job.setSortComparatorClass(KeyComparator.Class);
注意:如果没有使用自定义的SortComparator类,则默认使用Key中compareTo()方法对Key排序。
4、定义分组类函数
在Reduce阶段,构造一个与 Key 相对应的 Value 迭代器的时候,只要first相同就属于同一个组,放在一个Value迭代器。定义这个比较器,可以有两种方式。
1) 继承 WritableComparator。
public static class GroupingComparator extends WritableComparator
必须有一个构造函数,并且重载以下方法。
public int compare(WritableComparable w1, WritableComparable w2)
2) 实现接口 RawComparator。
上面两种实现方式,在 Job 中,可以通过 setGroupingComparatorClass()方法来设置分组类。
job.setGroupingComparatorClass(GroupingComparator.Class);
另外注意的是,如果reduce的输入与输出不是同一种类型,则 Combiner和Reducer 不能共用 Reducer 类,因为
Combiner 的输出是 reduce 的输入。除非重新定义一个Combiner。
3、代码实现
Hadoop的example包中自带了一个MapReduce的二次排序算法,下面对 example包中的二次排序进行改进
package com.buaa;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
/**
* @ProjectName SecondarySort
* @PackageName com.buaa
* @ClassName IntPair
* @Description 将示例数据中的key/value封装成一个整体作为Key,同时实现 WritableComparable接口并重写其方法
* @Author 刘吉超
* @Date 2016-06-07 22:31:53
*/
public class IntPair implements WritableComparable<IntPair>{
private int first;
private int second;
public IntPair(){
}
public IntPair(int left, int right){
set(left, right);
}
public void set(int left, int right){
first = left;
second = right;
}
@Override
public void readFields(DataInput in) throws IOException{
first = in.readInt();
second = in.readInt();
}
@Override
public void write(DataOutput out) throws IOException{
out.writeInt(first);
out.writeInt(second);
}
@Override
public int compareTo(IntPair o)
{
if (first != o.first){
return first < o.first ? -1 : 1;
}else if (second != o.second){
return second < o.second ? -1 : 1;
}else{
return 0;
}
}
@Override
public int hashCode(){
return first * 157 + second;
}
@Override
public boolean equals(Object right){
if (right == null)
return false;
if (this == right)
return true;
if (right instanceof IntPair){
IntPair r = (IntPair) right;
return r.first == first && r.second == second;
}else{
return false;
}
}
public int getFirst(){
return first;
}
public int getSecond(){
return second;
}
}
package com.buaa;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
/**
* @ProjectName SecondarySort
* @PackageName com.buaa
* @ClassName SecondarySort
* @Description TODO
* @Author 刘吉超
* @Date 2016-06-07 22:40:37
*/
@SuppressWarnings("deprecation")
public class SecondarySort {
public static class Map extends Mapper<LongWritable, Text, IntPair, IntWritable> {
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
int left = 0;
int right = 0;
if (tokenizer.hasMoreTokens()) {
left = Integer.parseInt(tokenizer.nextToken());
if (tokenizer.hasMoreTokens())
right = Integer.parseInt(tokenizer.nextToken());
context.write(new IntPair(left, right), new IntWritable(right));
}
}
}
/*
* 自定义分区函数类FirstPartitioner,根据 IntPair中的first实现分区
*/
public static class FirstPartitioner extends Partitioner<IntPair, IntWritable>{
@Override
public int getPartition(IntPair key, IntWritable value,int numPartitions){
return Math.abs(key.getFirst() * 127) % numPartitions;
}
}
/*
* 自定义GroupingComparator类,实现分区内的数据分组
*/
@SuppressWarnings("rawtypes")
public static class GroupingComparator extends WritableComparator{
protected GroupingComparator(){
super(IntPair.class, true);
}
@Override
public int compare(WritableComparable w1, WritableComparable w2){
IntPair ip1 = (IntPair) w1;
IntPair ip2 = (IntPair) w2;
int l = ip1.getFirst();
int r = ip2.getFirst();
return l == r ? 0 : (l < r ? -1 : 1);
}
}
public static class Reduce extends Reducer<IntPair, IntWritable, Text, IntWritable> {
public void reduce(IntPair key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
for (IntWritable val : values) {
context.write(new Text(Integer.toString(key.getFirst())), val);
}
}
}
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
// 读取配置文件
Configuration conf = new Configuration();
// 判断路径是否存在,如果存在,则删除
Path mypath = new Path(args[1]);
FileSystem hdfs = mypath.getFileSystem(conf);
if (hdfs.isDirectory(mypath)) {
hdfs.delete(mypath, true);
}
Job job = new Job(conf, "secondarysort");
// 设置主类
job.setJarByClass(SecondarySort.class);
// 输入路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
// 输出路径
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// Mapper
job.setMapperClass(Map.class);
// Reducer
job.setReducerClass(Reduce.class);
// 分区函数
job.setPartitionerClass(FirstPartitioner.class);
// 本示例并没有自定义SortComparator,而是使用IntPair中compareTo方法进行排序 job.setSortComparatorClass();
// 分组函数
job.setGroupingComparatorClass(GroupingComparator.class);
// map输出key类型
job.setMapOutputKeyClass(IntPair.class);
// map输出value类型
job.setMapOutputValueClass(IntWritable.class);
// reduce输出key类型
job.setOutputKeyClass(Text.class);
// reduce输出value类型
job.setOutputValueClass(IntWritable.class);
// 输入格式
job.setInputFormatClass(TextInputFormat.class);
// 输出格式
job.setOutputFormatClass(TextOutputFormat.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
总结
以上所述是小编给大家介绍的hadoop二次排序的原理和实现方法,希望对大家有所帮助,如果大家有任何疑问欢迎给我留言,小编会及时回复大家的!
来源:https://blog.csdn.net/kuodannie1668/article/details/82218205


猜你喜欢
- 在进行大量数据训练神经网络的时候,可能需要批量读取数据。于是参考了这篇文章的代码,结果发现数据一直批量循环输出,不会在数据的末尾自动停止。然
- 通常,你需要获得当前日期和计算一些其他的日期,例如,你的程序可能需要判断一个月的第一天或者最后一天。你们大部分人大概都知道怎样把日期进行分割
- 一、表示修饰符。可以在模块或者类的定义层内对函数进行修饰。出现在函数定义的前一行,不允许和函数定义在同一行。一个修饰符就是一个函数,它将被修
- 1、异常的传播当在函数中出现异常时,如果在函数中对异常进行了处理,则异常不会再继续传播。如果函数中没有对异常进行处理,则异常会继续向函数调用
- 本文实例讲述了Python实现对象转换为xml的方法。分享给大家供大家参考,具体如下:# -*- coding:UTF-8 -*-'
- 实例化对象名._类名__私有属性名 class Flylove:price = 123 def __init__(self):s
- mulLine = """Hello!!! Wellcome to Python's world! T
- 一、作用主要用于保留组件状态或避免重新渲染。二、用法<keep-alive> 包裹动态组件时,会缓存不活动的组件实例,
- 在日常工作中,除了需要从 JSON 转化为 Go 的数据结构。但往往相反的情况是:我们需要将数据以 JSON 字符串的形式发送到 Web 服
- 前言很多时候需要对自己模型进行性能评估,对于一些理论上面的知识我想基本不用说明太多,关于校验模型准确度的指标主要有混淆矩阵、准确率、精确率、
- 之前有写过一篇浏览器的tab设计,这回说说网站的tab设计。一说到tab很自然地就想到了导航、信息架构。随着网站信息结构的复杂化,选择tab
- 1.项目目录及文件说明:manage.pydjango中的一个命令行工具,管理django项目;__init__.py空文件,告诉pytho
- 前言我自己常用的简单Python代码调试工具是IDLE和Sublime3,IDLE很少使用了,基本上用Sublime3稍微多一些,Subli
- 引言少年,你在怀着非法的心态看一篇简短的硬核科普!先抛问题:如何杀掉一个正在等待 TCP 连接的 Thread?由于众所周知的原因,在国内使
- FULLTEXT以前使用查找时都是以 %关键字% 进行模糊查询结果的,这种查询方式有一些缺点,比如不能查询多个列必须手动添加条件以实现,效率
- 我就废话不多说了,大家还是看代码吧! import PyPDF2 import repdf_file = open('xxx.pdf
- 效果演示开发工具Python版本: 3.6.4相关模块:pygame模块;PyQt5模块;以及一些Python自带的模块。环境搭建安装Pyt
- 1.URLError首先解释下URLError可能产生的原因: 网络无连接,即本机无法上网 &
- <%@LANGUAGE="VBSCRIPT" CODEPAGE="936"%> &nbs
- 听说 FaceBook 开放其网站的代码了,期前也算是了解过 FaceBook 的架构,所以重点就是看其代码的质量。可以毫不夸张的说,Fac