java实现MapReduce对文件进行切分的示例代码
作者:liangzai2048 发布时间:2023-10-07 21:46:59
标签:java,MapReduce,切分
比如有海量的文本文件,如订单,页面点击事件的记录,量特别大,很难搞定。
那么我们该怎样解决海量数据的计算?
1、获取总行数
2、计算每个文件中存多少数据
3、split切分文件
4、reduce将文件进行汇总
例如这里有百万条数据,单个文件操作太麻烦,所以我们需要进行切分
在切分文件的过程中会出现文件不能整个切分的情况,可能有剩下的数据并没有被读取到,所以我们每个切分128条数据,不足128条再保留到一个文件中
创建MapTask
import java.io.*;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
public class MapTask extends Thread {
//用来接收具体的哪一个文件
private File file;
private int flag;
public MapTask(File file, int flag) {
this.file = file;
this.flag = flag;
}
@Override
public void run() {
try {
BufferedReader br = new BufferedReader(new FileReader(file));
String line;
HashMap<String, Integer> map = new HashMap<String, Integer>();
while ((line = br.readLine()) != null) {
/**
* 统计班级人数HashMap存储
*/
String clazz = line.split(",")[4];
if (!map.containsKey(clazz)) {
map.put(clazz, 1);
} else {
map.put(clazz, map.get(clazz) + 1);
}
}
br.close();
BufferedWriter bw = new BufferedWriter(
new FileWriter("F:\\IDEADEMO\\shujiabigdata\\part\\part---" + flag));
Set<Map.Entry<String, Integer>> entries = map.entrySet();
for (Map.Entry<String, Integer> entry : entries) {
String key = entry.getKey();
Integer value = entry.getValue();
bw.write(key + ":" + value);
bw.newLine();
}
bw.flush();
bw.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
创建Map
import java.io.File;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Map {
public static void main(String[] args) {
long start = System.currentTimeMillis();
// 多线程连接池(线程池)
ExecutorService executorService = Executors.newFixedThreadPool(8);
// 获取文件列表
File file = new File("F:\\IDEADEMO\\shujiabigdata\\split");
File[] files = file.listFiles();
//创建多线程对象
int flag = 0;
for (File f : files) {
//为每一个文件启动一个线程
MapTask mapTask = new MapTask(f, flag);
executorService.submit(mapTask);
flag++;
}
executorService.shutdown();
long end = System.currentTimeMillis();
System.out.println(end-start);
}
}
创建ClazzSum
import java.io.BufferedReader;
import java.io.FileReader;
import java.util.HashMap;
public class ClazzSum {
public static void main(String[] args) throws Exception {
long start = System.currentTimeMillis();
BufferedReader br = new BufferedReader(
new FileReader("F:\\IDEADEMO\\shujiabigdata\\data\\bigstudents.txt"));
String line;
HashMap<String, Integer> map = new HashMap<String, Integer>();
while ((line = br.readLine()) != null) {
String clazz = line.split(",")[4];
if (!map.containsKey(clazz)) {
map.put(clazz, 1);
} else {
map.put(clazz, map.get(clazz) + 1);
}
}
System.out.println(map);
long end = System.currentTimeMillis();
System.out.println(end-start);
}
}
创建split128
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.FileReader;
import java.io.FileWriter;
import java.util.ArrayList;
public class Split128 {
public static void main(String[] args) throws Exception {
BufferedReader br = new BufferedReader(
new FileReader("F:\\IDEADEMO\\shujiabigdata\\data\\students.txt"));
//用作标记文件,也作为文件名称
int index = 0;
BufferedWriter bw = new BufferedWriter(
new FileWriter("F:\\IDEADEMO\\shujiabigdata\\split01\\split---" + index));
ArrayList<String> list = new ArrayList<String>();
String line;
//用作累计读取了多少行数据
int flag = 0;
int row = 0;
while ((line = br.readLine()) != null) {
list.add(line);
flag++;
// flag = 140
if (flag == 140) {// 一个文件读写完成,生成新的文件
row = 0 + 128 * index;
for (int i = row; i <= row + 127; i++) {
bw.write(list.get(i));
bw.newLine();
}
bw.flush();
bw.close();
/**
* 生成新的文件
* 计数清零
*/
index++;
flag = 12;
bw = new BufferedWriter(
new FileWriter("F:\\IDEADEMO\\shujiabigdata\\split01\\split---" + index));
}
}
//文件读取剩余128*1.1范围之内
for (int i = list.size() - flag; i < list.size(); i++) {
bw.write(list.get(i));
bw.newLine();
}
bw.flush();
bw.close();
}
}
创建Reduce
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.util.HashMap;
public class Reduce {
public static void main(String[] args) throws Exception {
long start = System.currentTimeMillis();
HashMap<String, Integer> map = new HashMap<String, Integer>();
File file = new File("F:\\IDEADEMO\\shujiabigdata\\part");
File[] files = file.listFiles();
for (File f : files) {
BufferedReader br = new BufferedReader(new FileReader(f));
String line;
while ((line = br.readLine()) != null) {
String clazz = line.split(":")[0];
int sum = Integer.valueOf(line.split(":")[1]);
if (!map.containsKey(clazz)) {
map.put(clazz, sum);
} else {
map.put(clazz, map.get(clazz) + sum);
}
}
}
long end = System.currentTimeMillis();
System.out.println(end-start);
System.out.println(map);
}
}
最后将文件切分了8份,这里采用了线程池,建立线程连接,多个线程同时启动,比单一文件采用多线程效率更高更好使。
来源:https://blog.csdn.net/hujieliang123/article/details/122546452


猜你喜欢
- 最近在用SpringMvc写项目的时候,遇到一个问题,就是方法的鉴权问题,这个问题弄了一天了终于解决了,下面看下解决方法项目需求:需要鉴权的
- 本文实例讲述了C#泛型委托的用法。分享给大家供大家参考。具体分析如下:冒泡排序大家都知道,例如一个整形数组,可以用冒泡排序来使它按从小到大的
- 本文简单介绍如何引入validation的步骤,如何通过自定义validation减少代码量,提高生产力。特别提及:非基本类型属性的vali
- 前言如今多线程编程已成为了现代软件开发中的重要部分,而并发编程中的线程同步问题更是一道难以逾越的坎。在Java语言中,synchronize
- 首先,想要明白hashCode的作用,你必须要先知道Java中的集合。 总的来说,Java中的集合(Collection)有两类,一类是Li
- 业务场景通常微服务对于用户认证信息解析有两种方案在 gateway 就解析用户的 token 然后路由的时候把 userId 等相关信息添加
- 生产者和消费者问题是线程模型中的经典问题:生产者和消费者在同一时间段内共用同一个存储空间,如下图所示生产者向空间里存放数据,而消费者取用数据
- mybatis一直加载xml,找到错误我们在写springmvc+mybatis项目,启动项目的时候,mapper配置文件一直刷,一直加载。
- 本次主要分享的是3个免费的二维码接口的对接代码和测试得出的注意点及区别,有更好处理方式多多交流,相互促进进步;最近在学习JavsScript
- 效果图界面绘制操作private Point? _startPoint = null; private void Contain
- 首次使用idea需要配置哪些东西?最近因为我的eclipse无法配置sts,于是将战场转移至idea,首次使用idea,所有的配置都得重新开
- 本文实例为大家分享了Android自定义广播接收的具体代码,供大家参考,具体内容如下实现效果:MainActivity.java代码:pac
- Java中Class类的作用与深入理解 在程序运行期间,Java运行时系统始终为所有的对象维护一个被称为运行时的类型标识。这个信
- 本文实例讲述了C#中数组初始化与数组元素复制的方法。分享给大家供大家参考。具体如下:下面的代码演示如何创建和初始化数组,以及C#中如何将数组
- 目录前言常量池反编译代码验证字符串初始化操作总结前言在深入学习字符串类之前,我们先搞懂JVM是怎样处理新生字符串的。当你知道字符串的初始化细
- 这个是设置定时提醒的功能,即设置几点几分后提醒,用的是给系统设置个时间点,当系统时间到达设置的时间点的时候就会给我们发送一个广播,然后达到时
- 思路:先获得当前季度的开始和结束日期,在当前日期的基础上往前推3个月即上个季度的开始和结束日期/** * @param fla
- 本文以实例阐述了C++中形参与实参的区别,有助于读者加深对于C++形参与实参的认识。形参出现在函数定义中,在整个函数体内都可以使用, 离开该
- 循环轮播的方法有两种,一种是使用定时器另外一种是使用手指拨动,相比较而言,定时器实现循环播放比较容易,只要在定时器的消息里加上简单几段代码即
- 目录1 Semaphore的主要方法2 实例讲解实现单例模式3 源码解析构造方法获取许可释放许可减小许可数量获取剩余许可数量前言:Semap