详解HDFS多文件Join操作的实例
作者:回首凡尘不做仙 发布时间:2023-09-20 07:44:14
标签:HDFS,多文件,Join操作
详解HDFS多文件Join操作的实例
最近在做HDFS文件处理之时,遇到了多文件Join操作,其中包括:All Join以及常用的Left Join操作,
下面是个简单的例子;采用两个表来做left join其中数据结构如下:
A 文件:
a|1b|2|c
B文件:
a|b|1|2|c
即:A文件中的第一、二列与B文件中的第一、三列对应;类似数据库中Table的主键/外键
代码如下:
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.contrib.utils.join.DataJoinMapperBase;
import org.apache.hadoop.contrib.utils.join.DataJoinReducerBase;
import org.apache.hadoop.contrib.utils.join.TaggedMapOutput;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import cn.eshore.traffic.hadoop.util.CommUtil;
import cn.eshore.traffic.hadoop.util.StringUtil;
/**
* @ClassName: DataJoin
* @Description: HDFS JOIN操作
* @author hadoop
* @date 2012-12-18 下午5:51:32
*/
public class InstallJoin extends Configured implements Tool {
private String static enSplitCode = "\\|";
private String static splitCode = "|";
// 自定义Reducer
public static class ReduceClass extends DataJoinReducerBase {
@Override
protected TaggedMapOutput combine(Object[] tags, Object[] values) {
String joinedStr = "";
//该段判断用户生成Left join限制【其中tags表示文件的路径,install表示文件名称前缀】
//去掉则为All Join
if (tags.length == 1 && tags[0].toString().contains("install")) {
return null;
}
Map<String, String> map = new HashMap<String, String>();
for (int i = 0; i < values.length; i++) {
TaggedWritable tw = (TaggedWritable) values[i];
String line = ((Text) tw.getData()).toString();
String[] tokens = line.split(enSplitCode, 8);
String groupValue = tokens[6];
String type = tokens[7];
map.put(type, groupValue);
}
joinedStr += StringUtil.getCount(map.get("7"))+"|"+StringUtil.getCount(map.get("30"));
TaggedWritable retv = new TaggedWritable(new Text(joinedStr));
retv.setTag((Text) tags[0]);
return retv;
}
}
// 自定义Mapper
public static class MapClass extends DataJoinMapperBase {
//自定义Key【类似数据库中的主键/外键】
@Override
protected Text generateGroupKey(TaggedMapOutput aRecord) {
String line = ((Text) aRecord.getData()).toString();
String[] tokens = line.split(CommUtil.enSplitCode);
String key = "";
String type = tokens[7];
//由于不同文件中的Key所在列有可能不同,所以需要动态生成Key,其中type为不同文件中的数据标识;如:A文件最后一列为a用于表示此数据为A文件数据
if ("7".equals(type)) {
key = tokens[0]+"|"+tokens[1];
}else if ("30".equals(type)) {
key = tokens[0]+"|"+tokens[2];
}
return new Text(key);
}
@Override
protected Text generateInputTag(String inputFile) {
return new Text(inputFile);
}
@Override
protected TaggedMapOutput generateTaggedMapOutput(Object value) {
TaggedWritable retv = new TaggedWritable((Text) value);
retv.setTag(this.inputTag);
return retv;
}
}
public static class TaggedWritable extends TaggedMapOutput {
private Writable data;
// 自定义
public TaggedWritable() {
this.tag = new Text("");
}
public TaggedWritable(Writable data) {
this.tag = new Text("");
this.data = data;
}
@Override
public Writable getData() {
return data;
}
@Override
public void write(DataOutput out) throws IOException {
this.tag.write(out);
out.writeUTF(this.data.getClass().getName());
this.data.write(out);
}
@Override
public void readFields(DataInput in) throws IOException {
this.tag.readFields(in);
String dataClz = in.readUTF();
if (this.data == null
|| !this.data.getClass().getName().equals(dataClz)) {
try {
this.data = (Writable) ReflectionUtils.newInstance(
Class.forName(dataClz), null);
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
this.data.readFields(in);
}
}
/**
* job运行
*/
@Override
public int run(String[] paths) throws Exception {
int no = 0;
try {
Configuration conf = getConf();
JobConf job = new JobConf(conf, InstallJoin.class);
FileInputFormat.setInputPaths(job, new Path(paths[0]));
FileOutputFormat.setOutputPath(job, new Path(paths[1]));
job.setJobName("join_data_test");
job.setMapperClass(MapClass.class);
job.setReducerClass(ReduceClass.class);
job.setInputFormat(TextInputFormat.class);
job.setOutputFormat(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(TaggedWritable.class);
job.set("mapred.textoutputformat.separator", CommUtil.splitCode);
JobClient.runJob(job);
no = 1;
} catch (Exception e) {
throw new Exception();
}
return no;
}
//测试
public static void main(String[] args) {
String[] paths = {
"hdfs://master...:9000/home/hadoop/traffic/join/newtype",
"hdfs://master...:9000/home/hadoop/traffic/join/newtype/output" }
int res = 0;
try {
res = ToolRunner.run(new Configuration(), new InstallJoin(), paths);
} catch (Exception e) {
e.printStackTrace();
}
System.exit(res);
}
}
如有疑问请留言或者到本站社区交流讨论,感谢阅读,希望能帮助到大家,谢谢大家对本站的支持!
来源:http://blog.csdn.net/vtopqx/article/details/8589428


猜你喜欢
- Android 定时器实现图片的变换在Android中,要让每秒进行一次ui更新,就需要利用到定时器和handler,message的结合,
- 其中定义WIFI AP的几个状态public static final int WIFI_AP_STATE_DISABLING = 10;
- 解决库存扣减及订单创建时防止并发死锁的问题在我们日常开发的过程可有会遇到以下错误事务(进程 ID 82)与另一个进程被死锁在 锁 资源上,并
- PDF中的加数字签名是对文档权威性的有效证明。我们在向PDF文档添加签名时,需要准备可信任的签名证书。同时,对已有的签名,可验证签名是否有效
- 本文实例为大家分享了java实现鲜花销售系统的具体代码,供大家参考,具体内容如下一、练习目标1.体会数组的作用2.找到分层开发的感觉3.收获
- 本文实例为大家分享了java后台批量下载文件并压缩成zip下载的具体代码,供大家参考,具体内容如下因项目需要,将服务器上的图片文件压缩打包z
- application.properties大家都不陌生,我们在开发的时候,经常使用它来配置一些可以手动修改而且不用编译的变量,这样的作用在
- 本文实例讲述了Android实现Flip翻转动画效果的方法,分享给大家供大家学习借鉴。具体实现代码如下:LinearLayout locat
- 下面是android SDK中API中的主要java包的功能简介: android.app :提供高层的程序模型、提供基本的运行环
- 概述引入多态是继封装、继承之后,面向对象的第三大特性。 生活中,比如跑的动作,小猫、小狗和大象,跑起来是不一样的。再比如飞的动作,昆虫、鸟类
- 本文实例讲述了Android编程实现AIDL(跨进程通信)的方法。分享给大家供大家参考,具体如下:一. 概述:跨进程通信(AIDL),主要实
- 本文实例讲述了Android开发中4个常用的工具类。分享给大家供大家参考,具体如下:1、土司工具类(Toast管理)/** * Toast统
- 本文实例为大家分享了C#基于Sockets类实现TCP通讯的具体代码,供大家参考,具体内容如下最终效果TCPClientusing Syst
- 1.导入pom依赖<!-- mybatis-->  
- // Path类 IO命名空间 静态类 不能创建对象类名.string str =@"E:\C#程序设计基础入门教程\(第十一天)
- 查看JDK1.8 ArrayList的源代码1、默认初始容量为10 /** * Default i
- 一、介绍pair是将2个数据组合成一组数据,当需要这样的需求时就可以使用pair。当然你也可以自定义一个结构体struct。不过大家都是为了
- 引言二分查找底层依赖的是数组随机访问的特性,所以只能用数组来实现。如果数据存储在链表中,就真的没法用二分查找算法了吗?实际上,只需要对链表稍
- 我们知道java程序是运行在JVM中的,而JVM就是构建在内存上的虚拟机,那么内存模型JMM是做什么用的呢?我们考虑一个简单的赋值问题:in
- 常量:其值不变即为常量。语法:数据类型 常量名 = 值;doubl