云计算实验:Java MapReduce编程
作者:wow_awsl_qwq 发布时间:2021-10-08 14:25:52
实验题目:
MapReduce
:编程
实验内容:
本实验利用 Hadoop
提供的 Java API
进行编程进行 MapReduce
编程。
实验目标:
掌握
MapReduce
编程。理解
MapReduce
原理
【实验作业】简单流量统计
有如下这样的日志文件:
13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 2481 24681 200
13726230513 00-FD-07-A4-72-B8:CMCC 120.196.40.8 i02.c.aliimg.com 248 0 200
13826230523 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 2481 24681 200
13726230533 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 2481 24681 200
13726230543 00-FD-07-A4-72-B8:CMCC 120.196.100.82 Video website 1527 2106 200
13926230553 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 2481 24681 200
13826230563 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 2481 24681 200
13926230573 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 2481 24681 200
18912688533 00-FD-07-A4-72-B8:CMCC 220.196.100.82 Integrated portal 1938 2910 200
18912688533 00-FD-07-A4-72-B8:CMCC 220.196.100.82 i02.c.aliimg.com 3333 21321 200
13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 Search Engines 9531 9531 200
13826230523 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 2481 24681 200
13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 2481 24681 200
该日志文件记录了每个手机用户在一段时间内的网络流量信息,具体字段含义为:
手机号码 MAC
地址 IP地址 域名 上行流量(字节数) 下行流量(字节数) 套餐类型
根据以上日志,统计出每个手机用户在该时间段内的总流量(上行流量+下行流量),统计结果的格式为:
手机号码 字节数量
实验结果:
实验代码:
WcMap.java
import java.io.IOException;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class WcMap extends Mapper<LongWritable, Text, Text, LongWritable>{
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String str = value.toString();
String[] words = StringUtils.split(str," ",10);
int i=0;
for(String word : words){
if(i==words.length-2||i==words.length-3)
context.write(new Text(words[0]), new LongWritable(Integer.parseInt(word)));
i++;
}
}
}
WcReduce.java
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class WcReduce extends Reducer<Text, LongWritable, Text, LongWritable>{
@Override
protected void reduce(Text key, Iterable<LongWritable> values,Context context)
throws IOException, InterruptedException {
long count = 0;
for(LongWritable value : values){
count += value.get();
}
context.write(key, new LongWritable(count));
}
}
WcRunner.java
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.util.Scanner;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import java.net.URI;
public class WcRunner{
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(WcRunner.class);
job.setMapperClass(WcMap.class);
job.setReducerClass(WcReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
Scanner sc = new Scanner(System.in);
System.out.print("inputPath:");
String inputPath = sc.next();
System.out.print("outputPath:");
String outputPath = sc.next();
try {
FileSystem fs0 = FileSystem.get(new URI("hdfs://master:9000"), new Configuration());
Path hdfsPath = new Path(outputPath);
fs0.copyFromLocalFile(new Path("/headless/Desktop/workspace/mapreduce/WordCount/data/1.txt"),new Path("/mapreduce/WordCount/input/1.txt"));
if(fs0.delete(hdfsPath,true)){
System.out.println("Directory "+ outputPath +" has been deleted successfully!");
}
}catch(Exception e) {
e.printStackTrace();
}
FileInputFormat.setInputPaths(job, new Path("hdfs://master:9000"+inputPath));
FileOutputFormat.setOutputPath(job, new Path("hdfs://master:9000"+outputPath));
job.waitForCompletion(true);
try {
FileSystem fs = FileSystem.get(new URI("hdfs://master:9000"), new Configuration());
Path srcPath = new Path(outputPath+"/part-r-00000");
FSDataInputStream is = fs.open(srcPath);
System.out.println("Results:");
while(true) {
String line = is.readLine();
if(line == null) {
break;
}
System.out.println(line);
}
is.close();
}catch(Exception e) {
e.printStackTrace();
}
}
}
【实验作业】索引倒排输出行号
在索引倒排实验中,我们可以得到每个单词分布在哪些文件中,以及在每个文件中出现的次数,修改以上实现,在输出的倒排索引结果中可以得到每个单词在每个文件中的具体行号信息。输出结果的格式如下:
单词 文件名:行号,文件名:行号,文件名:行号
实验结果:
MapReduce在3.txt的第一行出现了两次所以有两个1
import java.io.*;
import java.util.StringTokenizer;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
public class MyMapper extends Mapper<Object,Text,Text,Text>{
private Text keyInfo = new Text();
private Text valueInfo = new Text();
private FileSplit split;
int num=0;
public void map(Object key,Text value,Context context)
throws IOException,InterruptedException{
num++;
split = (FileSplit)context.getInputSplit();
StringTokenizer itr = new StringTokenizer(value.toString());
while(itr.hasMoreTokens()){
keyInfo.set(itr.nextToken()+" "+split.getPath().getName().toString());
valueInfo.set(num+"");
context.write(keyInfo,valueInfo);
}
}
}
import java.io.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Reducer;
public class MyCombiner extends Reducer<Text,Text,Text,Text>{
private Text info = new Text();
public void reduce(Text key,Iterable<Text>values,Context context)
throws IOException, InterruptedException{
String sum = "";
for(Text value:values){
sum += value.toString()+" ";
}
String record = key.toString();
String[] str = record.split(" ");
key.set(str[0]);
info.set(str[1]+":"+sum);
context.write(key,info);
}
}
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class MyReducer extends Reducer<Text,Text,Text,Text>{
private Text result = new Text();
public void reduce(Text key,Iterable<Text>values,Context context) throws
IOException, InterruptedException{
String value =new String();
for(Text value1:values){
value += value1.toString()+" ; ";
}
result.set(value);
context.write(key,result);
}
}
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.util.Scanner;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import java.net.URI;
public class MyRunner {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(MyRunner.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
job.setCombinerClass(MyCombiner.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
Scanner sc = new Scanner(System.in);
System.out.print("inputPath:");
String inputPath = sc.next();
System.out.print("outputPath:");
String outputPath = sc.next();
try {
FileSystem fs0 = FileSystem.get(new URI("hdfs://master:9000"), new Configuration());
Path hdfsPath = new Path(outputPath);
if(fs0.delete(hdfsPath,true)){
System.out.println("Directory "+ outputPath +" has been deleted successfully!");
}
}catch(Exception e) {
e.printStackTrace();
}
FileInputFormat.setInputPaths(job, new Path("hdfs://master:9000"+inputPath));
FileOutputFormat.setOutputPath(job, new Path("hdfs://master:9000"+outputPath));
job.waitForCompletion(true);
try {
FileSystem fs = FileSystem.get(new URI("hdfs://master:9000"), new Configuration());
Path srcPath = new Path(outputPath+"/part-r-00000");
FSDataInputStream is = fs.open(srcPath);
System.out.println("Results:");
while(true) {
String line = is.readLine();
if(line == null) {
break;
}
System.out.println(line);
}
is.close();
}catch(Exception e) {
e.printStackTrace();
}
}
}
来源:https://blog.csdn.net/qq_42641977/article/details/121578883


猜你喜欢
- 1.内部类概念及分类将一个类定义在另一个类的内部或者接口内部或者方法体内部,这个类就被称为内部类,我们不妨将内部类所在的类称为外围类,除了定
- 本文给大家介绍Activity的生命周期,如果大家学习过iOS的小伙伴的话,Activity的生命周期和iOS中ViewController
- 前段时间分享了《阅读跟踪 Java 源码的几个小技巧》是基于 Eclipse 版本的,看大家的留言都是想要 IDEA 版本的源码阅读技巧。所
- 本文实例为大家分享了java实现简单扫雷游戏的具体代码,供大家参考,具体内容如下package com.test.swing;import
- 在消息通知时,我们经常用到两个组件Toast和Notification。特别是重要的和需要长时间显示的信息,用Notification就最
- 系统有很多光标类型 :Cursors 类 (System.Windows.Input) | Microsoft Docs本章介绍如何自定义光
- maven打包指定jdk的版本问题今天遇到个问题,项目中新写了一个接口,其中用到了lambda表达式,本地跑是没问题的,但提交到gitLab
- 1.Java Io流的概念,分类,类图。1.1 Java Io流的概念java的io是实现输入和输出的基础,可以方便的实现数据的输入和输出操
- 在手机客户端尤其是Android应用的开发过程中,我们经常会接触到“硬件加速”这个词。由于操作系统对底层软硬件封装非常完善,上层软件开发者往
- 有三种方法如下:三个方法都需要动态申请读写权限否则保存图片到相册也会失败方法一/** * 保存bitmap到本地
- Android-webview和js互相调用Android 和 H5 都是移动开发应用的非常广泛。市面上很多App都是使用Android开发
- 线程(英语:thread)是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序
- 上周,公司的项目改版要求加上一个右滑返回上一个界面,于是就在网上找了一些开源库打算实现.但是在使用的时候遇见了许多的问题.试了两天用过 ht
- 一、程序环境以下内容通过C#及VB.NET代介绍如何给Excel文档添加数字签名,以及删除Excel文档中已有的数字签名。工具使用最近发布的
- 经常要检测某些IP地址范围段的计算机是否在线。有很多的方法,比如进入到网关的交换机上去查询、使用现成的工具或者编写一个简单的DOS脚本等等,
- 前提之前很长一段时间关注JDK协程库的开发进度,但是前一段时间比较忙很少去查看OpenJDK官网的内容。Java协程项目Loom(因为项目还
- Java中 * 主要有JDK和CGLIB两种方式。区别主要是jdk是代理接口,而cglib是代理类。优点:这种方式已经解决我们前面所有日记
- 其实就只有一条sql语句<select id = "search" resultType = "mate
- 静态库和动态库的区别1、静态库的扩展名一般为".a"或者".lib";动态库的扩展名一般为"
- 不安全的集合在单线程应用中,通常采取new ArrayList(),指定一个List集合,用于存放可重复的数据。但在多线程下,往往会出现意想