java实现MapReduce对文件进行切分的示例代码
作者:liangzai2048 发布时间:2023-10-07 21:46:59
标签:java,MapReduce,切分
比如有海量的文本文件,如订单,页面点击事件的记录,量特别大,很难搞定。
那么我们该怎样解决海量数据的计算?
1、获取总行数
2、计算每个文件中存多少数据
3、split切分文件
4、reduce将文件进行汇总
例如这里有百万条数据,单个文件操作太麻烦,所以我们需要进行切分
在切分文件的过程中会出现文件不能整个切分的情况,可能有剩下的数据并没有被读取到,所以我们每个切分128条数据,不足128条再保留到一个文件中
创建MapTask
import java.io.*;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
public class MapTask extends Thread {
//用来接收具体的哪一个文件
private File file;
private int flag;
public MapTask(File file, int flag) {
this.file = file;
this.flag = flag;
}
@Override
public void run() {
try {
BufferedReader br = new BufferedReader(new FileReader(file));
String line;
HashMap<String, Integer> map = new HashMap<String, Integer>();
while ((line = br.readLine()) != null) {
/**
* 统计班级人数HashMap存储
*/
String clazz = line.split(",")[4];
if (!map.containsKey(clazz)) {
map.put(clazz, 1);
} else {
map.put(clazz, map.get(clazz) + 1);
}
}
br.close();
BufferedWriter bw = new BufferedWriter(
new FileWriter("F:\\IDEADEMO\\shujiabigdata\\part\\part---" + flag));
Set<Map.Entry<String, Integer>> entries = map.entrySet();
for (Map.Entry<String, Integer> entry : entries) {
String key = entry.getKey();
Integer value = entry.getValue();
bw.write(key + ":" + value);
bw.newLine();
}
bw.flush();
bw.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
创建Map
import java.io.File;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Map {
public static void main(String[] args) {
long start = System.currentTimeMillis();
// 多线程连接池(线程池)
ExecutorService executorService = Executors.newFixedThreadPool(8);
// 获取文件列表
File file = new File("F:\\IDEADEMO\\shujiabigdata\\split");
File[] files = file.listFiles();
//创建多线程对象
int flag = 0;
for (File f : files) {
//为每一个文件启动一个线程
MapTask mapTask = new MapTask(f, flag);
executorService.submit(mapTask);
flag++;
}
executorService.shutdown();
long end = System.currentTimeMillis();
System.out.println(end-start);
}
}
创建ClazzSum
import java.io.BufferedReader;
import java.io.FileReader;
import java.util.HashMap;
public class ClazzSum {
public static void main(String[] args) throws Exception {
long start = System.currentTimeMillis();
BufferedReader br = new BufferedReader(
new FileReader("F:\\IDEADEMO\\shujiabigdata\\data\\bigstudents.txt"));
String line;
HashMap<String, Integer> map = new HashMap<String, Integer>();
while ((line = br.readLine()) != null) {
String clazz = line.split(",")[4];
if (!map.containsKey(clazz)) {
map.put(clazz, 1);
} else {
map.put(clazz, map.get(clazz) + 1);
}
}
System.out.println(map);
long end = System.currentTimeMillis();
System.out.println(end-start);
}
}
创建split128
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.FileReader;
import java.io.FileWriter;
import java.util.ArrayList;
public class Split128 {
public static void main(String[] args) throws Exception {
BufferedReader br = new BufferedReader(
new FileReader("F:\\IDEADEMO\\shujiabigdata\\data\\students.txt"));
//用作标记文件,也作为文件名称
int index = 0;
BufferedWriter bw = new BufferedWriter(
new FileWriter("F:\\IDEADEMO\\shujiabigdata\\split01\\split---" + index));
ArrayList<String> list = new ArrayList<String>();
String line;
//用作累计读取了多少行数据
int flag = 0;
int row = 0;
while ((line = br.readLine()) != null) {
list.add(line);
flag++;
// flag = 140
if (flag == 140) {// 一个文件读写完成,生成新的文件
row = 0 + 128 * index;
for (int i = row; i <= row + 127; i++) {
bw.write(list.get(i));
bw.newLine();
}
bw.flush();
bw.close();
/**
* 生成新的文件
* 计数清零
*/
index++;
flag = 12;
bw = new BufferedWriter(
new FileWriter("F:\\IDEADEMO\\shujiabigdata\\split01\\split---" + index));
}
}
//文件读取剩余128*1.1范围之内
for (int i = list.size() - flag; i < list.size(); i++) {
bw.write(list.get(i));
bw.newLine();
}
bw.flush();
bw.close();
}
}
创建Reduce
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.util.HashMap;
public class Reduce {
public static void main(String[] args) throws Exception {
long start = System.currentTimeMillis();
HashMap<String, Integer> map = new HashMap<String, Integer>();
File file = new File("F:\\IDEADEMO\\shujiabigdata\\part");
File[] files = file.listFiles();
for (File f : files) {
BufferedReader br = new BufferedReader(new FileReader(f));
String line;
while ((line = br.readLine()) != null) {
String clazz = line.split(":")[0];
int sum = Integer.valueOf(line.split(":")[1]);
if (!map.containsKey(clazz)) {
map.put(clazz, sum);
} else {
map.put(clazz, map.get(clazz) + sum);
}
}
}
long end = System.currentTimeMillis();
System.out.println(end-start);
System.out.println(map);
}
}
最后将文件切分了8份,这里采用了线程池,建立线程连接,多个线程同时启动,比单一文件采用多线程效率更高更好使。
来源:https://blog.csdn.net/hujieliang123/article/details/122546452
0
投稿
猜你喜欢
- 介绍和使用场景1)什么是消息一个事件,需要广播或者单独传递给某个接口2)为什么使用这个配置更新了,但是其他系统不知道是否更新SpringCl
- 本文实例为大家分享了java实现简单年龄计算器的具体代码,供大家参考,具体内容如下制作一个如下图年龄计算器根据题目,我做了一个由Calend
- 一.Unsafe类的源码分析JDK的rt.jar包中的Unsafe类提供了硬件级别的原子操作,Unsafe里面的方法都是native方法,通
- 概述从今天开始, 小白我将带大家开启 Java 数据结构 & 算法的新篇章.字符串匹配字符串匹配 (String Matching)
- 前言最近项目需要和Oracle数据库进行交互,然后我从Maven中央仓库下载数据库驱动jar包,但怎么都下不下来,我到Oracle官网上一看
- 在IntelliJ IDEA中一不小心将你本地代码给覆盖了,这个时候,你 ctrl + z
- 在idea中将创建的java web项目部署到Tomcat中采用的工具idea 2018.3.6 Tomcat71.先创建第一个新项目sec
- 前言Zookeeper的通过快照日志和事务日志将内存信息保存下来,记录下来每次请求的具体信息。尤其是其事务日志,每次处理事务请求时都需要将其
- 异常分类可查的异常(checked exceptions):Exception下除了RuntimeException外的异常不可查的异常(u
- java 配置MyEclipse Maven环境虽然我的大部分项目已经迁到Idea上去了,但是在写部分小的测试程序的时候还是习惯
- 1.1、获取http请求参数是一种刚需我想有的小伙伴肯定有过获取http请求的需要,比如想前置获取参数,统计请求数据做服务的接口签名校验敏感
- 相关知识:Java中三种简单注解介绍和代码实例一、作用用 @Deprecated注解的程序元素,不鼓励程序员使用这样的元素,通常是因为它很危
- 一、准备工作1、确定电脑上已经成功安装jdk7.0以上版本2、win10操作系统3、maven安装包 下载地址:http://maven.a
- 本教程基于 JetBrains IntelliJ IDEA 2018.3.6 编写,高版本未经测试,或有不兼容,请见谅!JetBrains
- 相关api见:点击进入/* * Copyright 2014 the original author or authors. * * Lic
- BeanPostProcessor接口作用:如果我们想在Spring容器中完成bean实例化、配置以及其他初始化方法前后要添加一些自己逻辑处
- 使用过 mybatis 框架的小伙伴们都知道,mybatis 是个半 orm 框架,通过写 mapper 接口就能自动实现数据库的增删改查,
- 代码如下所示:package com.hoo.util; import java.awt.Color; import
- 实验目的在C#和Rest/Restful以及其它的Web服务交互过程中,大量使用到JSON传递数据,如何快捷的转化C#对象到JSON和转化J
- 以下四种方式:1.继承Thread类,重写run方法2.实现Runnable接口,重写run方法,实现Runnable接口的实现类的实例对象