hadoop上传文件功能实例代码
作者:mrr 发布时间:2021-11-01 00:57:59
hdfs上的文件是手动执行命令从本地linux上传至hdfs的。在真实的运行环境中,我们不可能每次手动执行命令上传的,这样太过繁琐。那么,我们可以使用hdfs提供的Java api实现文件上传至hdfs,或者直接从ftp上传至hdfs。
然而,需要说明一点,之前笔者是要运行MR,都需要每次手动执行yarn jar,在实际的环境中也不可能每次手动执行。像我们公司是使用了索答的调度平台/任务监控平台,可以定时的以工作流执行我们的程序,包括普通java程序和MR。其实,这个调度平台就是使用了quartz。当然,这个调度平台也提供其它的一些功能,比如web展示、日志查看等,所以也不是免费的。
首先,给大家简单介绍一下hdfs。hdfs是以流式数据访问模式来存储超大文件,hdfs的构建思路是一次写入,多次读取,这样才是最高效的访问模式。hdfs是为高数据吞吐量应用优化的,所以会以提高时间延迟为代价。对于低延时的访问需求,我们可以使用hbase。
然后,还要知道hdfs中块(block)的概念,默认为64MB。块是hdfs的数据读写的最小单位,通常每个map任务一次只处理一个block,像我们对集群性能评估就会使用到这个概念,比如目前有多少节点,每个节点的磁盘空间、cpu以及所要处理的数据量、网络带宽,通过这些信息来进行性能评估。我们可以使用Hadoop fsck / -files -blocks列出文件系统中各个文件由哪些块构成。
然后,再就是要知道namenode和datanode,这个在之前的博文已经介绍过,下面看看cm环境中hdfs的管理者(namenode)和工作者(datanode),如下
在yarn环境中是可以有多个nameNode的。此环境中没有SecondaryNameNode,当然也可以有。
好了,关于hdfs的基本概念就讲到这儿了,下面来看看具体的代码。
一、java实现上传本地文件至hdfs
这里,可以直接使用hdfs提供的java api即可实现,代码如下:
package com.bjpowernode.hdfs.local;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
/**
* ClassName:UploadLocalFileToHdfs <br/>
* Function: 本地文件上传至hdfs. <br/>
* Date: 2016年3月28日 下午10:06:05 <br/>
* @author qiyongkang
* @version
* @since JDK 1.6
* @see
*/
public class UploadLocalFileToHdfs {
public static void main(String[] args) {
Configuration conf = new Configuration();
String localDir = "/home/qiyongkang";
String hdfsDir = "/qiyongkang";
try{
Path localPath = new Path(localDir);
Path hdfsPath = new Path(hdfsDir);
FileSystem hdfs = FileSystem.get(conf);
hdfs.copyFromLocalFile(localPath, hdfsPath);
}catch(Exception e){
e.printStackTrace();
}
}
}
注意,这里hdfs上传目录如果不存在的话,hdfs会自动创建,比较智能。
打完包后,上传至服务器,执行yarn jar mr-demo-0.0.1-SNAPSHOT-jar-with-dependencies.jar
,然后执行hadoop fs -ls /qiyongkang便可看到:
二、java实现上传ftp上的文件至hdfs
首先,我们得准备一个ftp服务器,关于ftp服务器的搭建,大家可以查阅资料,笔者就不赘述了。
其实,从ftp上拉取文件上传到hdfs上,这个过程大家不要想复杂了,我们讲本地文件上传到hdfs,其实就是采用流的方式。因此,我们可以直接读取ftp上的文件流,然后以流的方式写入到hdfs。
下面,直接贴出代码:
package com.bjpowernode.hdfs.ftp;
import java.io.InputStream;
import org.apache.commons.net.ftp.FTP;
import org.apache.commons.net.ftp.FTPClient;
import org.apache.commons.net.ftp.FTPFile;
import org.apache.commons.net.ftp.FTPReply;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
/**
* ClassName:UploadFtpFileToHdfs <br/>
* Function: TODO ADD FUNCTION. <br/>
* Reason: TODO ADD REASON. <br/>
* Date: 2016年3月28日 下午10:50:37 <br/>
*
* @author qiyongkang
* @version
* @since JDK 1.6
* @see
*/
public class UploadFtpFileToHdfs {
public static void main(String[] args) {
Configuration conf = new Configuration();
loadFromFtpToHdfs("172.31.26.200", "qiyongkang", "qyk123456", "/www/input/", "/qiyongkang/", conf);
}
/**
*
* loadFromFtpToHdfs:将数据从ftp上传到hdfs上. <br/>
*
* @author qiyongkang
* @param ip
* @param username
* @param password
* @param filePath
* @param outputPath
* @param conf
* @return
* @since JDK 1.6
*/
private static boolean loadFromFtpToHdfs(String ip, String username, String password, String filePath,
String outputPath, Configuration conf) {
FTPClient ftp = new FTPClient();
InputStream inputStream = null;
FSDataOutputStream outputStream = null;
boolean flag = true;
try {
ftp.connect(ip);
ftp.login(username, password);
ftp.setFileType(FTP.BINARY_FILE_TYPE);
ftp.setControlEncoding("UTF-8");
int reply = ftp.getReplyCode();
if (!FTPReply.isPositiveCompletion(reply)) {
ftp.disconnect();
}
FTPFile[] files = ftp.listFiles(filePath);
FileSystem hdfs = FileSystem.get(conf);
for (FTPFile file : files) {
if (!(file.getName().equals(".") || file.getName().equals(".."))) {
inputStream = ftp.retrieveFileStream(filePath + file.getName());
outputStream = hdfs.create(new Path(outputPath + file.getName()));
IOUtils.copyBytes(inputStream, outputStream, conf, false);
if (inputStream != null) {
inputStream.close();
ftp.completePendingCommand();
}
}
}
ftp.disconnect();
} catch (Exception e) {
flag = false;
e.printStackTrace();
}
return flag;
}
}
然后同样打包上传后执行yarn jar mr-demo-0.0.1-SNAPSHOT-jar-with-dependencies.jar,便可看到:
总结
以上所述是小编给大家介绍的hadoop上传文件功能实例代码网站的支持!
来源:http://blog.sina.com.cn/s/blog_9c6852670102wwuz.html


猜你喜欢
- 使用INI配置文件,简单便捷。该辅助工具类为C#操作INI文件的辅助类,源码在某位师傅的基础上完善的来,因为忘记最初的来源了,因此不能提及引
- 每一个app都会有一个”退出登陆”的功能,当点击退出之后需要将所有的Activity都finish掉,开始是想将栈中的所有Activity清
- jasperreport导出的pdf每页显示的记录太少主要是确保Details的高度与Details中Field Text的高度一致。jas
- 由于我经常下载一些pdf格式的电子书,有的时候一些好书下载下来没有书签,读起来感觉没有整体的感觉,所以决定自己写一个小工具,将特定格式的文本
- 本文实例讲述了Java使用synchronized实现互斥锁功能。分享给大家供大家参考,具体如下:代码package per.thread;
- 前言最近被问到了这个问题,第一次回答的也是很不好,在此参考网上答案进行整理记录。供大家学习参考。Synchronized修饰非静态方法Syn
- 具体代码如下所示:***web.xml***<?xml version="1.0" encoding="
- 由于一个项目的需要,我研究了一下android的网络通信方式,大体和java平台的很相似! android平台也提供了很多的AP
- 概述还没玩过Spring Boot,现在越来越多的公司在用了,不得不学习了。本篇是Spring Boot的开篇,简单介绍一下如何创建一个Sp
- 微信聊天现在非常火,是因其界面漂亮吗,哈哈,也许吧。微信每条消息都带有一个气泡,非常迷人,看起来感觉实现起来非常难,其实并不难。下面小编给大
- 本文实例讲述了Java通过在主循环中判断Boolean来停止线程的方法。分享给大家供大家参考,具体如下:package Threads;/*
- 本文基于SpringBoot 2.5.0-M2讲解Spring中Lifecycle和SmartLifecycle的作用和区别,以及如何控制S
- 关于“标签PDF文件(Tagged PDF)标签PDF文件包含描述文档结构和各种文档元素顺序的元数据,是一种包含后端提供
- 一、获取接口请求的数据可以在Interceptor的afterCompletion中实现但是要重写RequestWrapper代码记录如下:
- Maven 多profile及指定编译要点项目A依赖项目B,项目A、B都有对应的多个profile,通过mvn –P参数指定profile,
- 一、HTTP http请求 一般一个http请求包括以下三个部分: 1 请求方法,如get,pos
- 本文实例为大家分享了Android实现摄像头切换,拍照及保存到相册,预览等功能,解决android7拍照之后不能连续预览的问题、参数设置相关
- 最近由于项目需求,需要做一个listview中的item策划删除的效果,与是查找资料和参考了一些相关的博客,终于完美实现了策划删除的效果。先
- 分页插件  MP中自带了分页插件的功能,只需要在配置类中进行简单的配置即可使用分页的相关功能。分页插件常
- 建库建表DROP DATABASE IF EXISTS mp;CREATE DATABASE mp DEFAULT CHARACTER SE