java 中Spark中将对象序列化存储到hdfs
作者:小水熊 发布时间:2022-09-17 04:06:18
标签:java,Spark,对象序列化
java 中Spark中将对象序列化存储到hdfs
摘要: Spark应用中经常会遇到这样一个需求: 需要将JAVA对象序列化并存储到HDFS, 尤其是利用MLlib计算出来的一些模型, 存储到hdfs以便模型可以反复利用. 下面的例子演示了Spark环境下从Hbase读取数据, 生成一个word2vec模型, 存储到hdfs.
废话不多说, 直接贴代码了. spark1.4 + hbase0.98
import org.apache.spark.storage.StorageLevel
import scala.collection.JavaConverters._
import java.io.File
import java.io.FileInputStream
import java.io.FileOutputStream
import java.io.ObjectInputStream
import java.io.ObjectOutputStream
import java.net.URI
import java.util.Date
import org.ansj.library.UserDefineLibrary
import org.ansj.splitWord.analysis.NlpAnalysis
import org.ansj.splitWord.analysis.ToAnalysis
import org.apache.hadoop.fs.FSDataInputStream
import org.apache.hadoop.fs.FSDataOutputStream
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.FileUtil
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName}
import org.apache.hadoop.hbase.filter.FilterList
import org.apache.hadoop.hbase.filter.PageFilter
import org.apache.hadoop.hbase.filter.RegexStringComparator
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.protobuf.ProtobufUtil
import org.apache.hadoop.hbase.util.{Base64, Bytes}
import com.feheadline.fespark.db.Neo4jManager
import com.feheadline.fespark.util.Env
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd._
import org.apache.spark.mllib.feature.{Word2Vec, Word2VecModel}
import scala.math.log
import scala.io.Source
object Word2VecDemo {
def convertScanToString(scan: Scan) = {
val proto = ProtobufUtil.toScan(scan)
Base64.encodeBytes(proto.toByteArray)
}
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("Word2Vec Demo")
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
sparkConf.set("spark.kryoserializer.buffer", "256m")
sparkConf.set("spark.kryoserializer.buffer.max","2046m")
sparkConf.set("spark.akka.frameSize", "500")
sparkConf.set("spark.rpc.askTimeout", "30")
val sc = new SparkContext(sparkConf)
val hbaseConf = HBaseConfiguration.create()
hbaseConf.set("hbase.zookeeper.quorum", "myzookeeper")
hbaseConf.set(TableInputFormat.INPUT_TABLE, "crawled")
val scan = new Scan()
val filterList:FilterList = new FilterList(FilterList.Operator.MUST_PASS_ALL)
val comp:RegexStringComparator = new RegexStringComparator(""".{1500,}""")
val articleFilter:SingleColumnValueFilter = new SingleColumnValueFilter(
"data".getBytes,
"article".getBytes,
CompareOp.EQUAL,
comp
)
filterList.addFilter(articleFilter)
filterList.addFilter(new PageFilter(100))
scan.setFilter(filterList)
scan.setCaching(50)
scan.setCacheBlocks(false)
hbaseConf.set(TableInputFormat.SCAN,convertScanToString(scan))
val crawledRDD = sc.newAPIHadoopRDD(
hbaseConf,
classOf[TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result]
)
val articlesRDD = crawledRDD.filter{
case (_,result) => {
val content = Bytes.toString(result.getValue("data".getBytes,"article".getBytes))
content != null
}
}
val wordsInDoc = articlesRDD.map{
case (_,result) => {
val content = Bytes.toString(result.getValue("data".getBytes,"article".getBytes))
if(content!=null)ToAnalysis.parse(content).asScala.map(_.getName).toSeq
else Seq("")
}
}
val fitleredWordsInDoc = wordsInDoc.filter(_.nonEmpty)
val word2vec = new Word2Vec()
val model = word2vec.fit(fitleredWordsInDoc)
//---------------------------------------重点看这里-------------------------------------------------------------
//将上面的模型存储到hdfs
val hadoopConf = sc.hadoopConfiguration
hadoopConf.set("fs.defaultFS", "hdfs://myhadoop:9000/")
val fileSystem = FileSystem.get(hadoopConf)
val path = new Path("/user/hadoop/data/mllib/word2vec-object")
val oos = new ObjectOutputStream(new FSDataOutputStream(fileSystem.create(path)))
oos.writeObject(model)
oos.close
//这里示例另外一个程序直接从hdfs读取序列化对象使用模型
val ois = new ObjectInputStream(new FSDataInputStream(fileSystem.open(path)))
val sample_model = ois.readObject.asInstanceOf[Word2VecModel]
/*
* //你还可以将序列化文件从hdfs放到本地, scala程序使用模型
* import java.io._
* import org.apache.spark.mllib.feature.{Word2Vec, Word2VecModel}
* val ois = new ObjectInputStream(new FileInputStream("/home/cherokee/tmp/word2vec-object"))
* val sample_model = ois.readObject.asInstanceOf[Word2VecModel]
* ois.close
*/
//--------------------------------------------------------------------------------------------------------------
}
}
感谢阅读,希望能帮助到大家,谢谢大家对本站的支持!
来源:https://my.oschina.net/waterbear/blog/525347


猜你喜欢
- name hobbyTom &nb
- 前言Docker旨在提供一种应用程序的自动化部署解决方案,在 Linux 系统上迅速创建一个容器(轻量级虚拟机)并部署和运行应用程序,并通过
- 简述偶然看到一篇关于阿里新orm框架的文章,好奇的点了进去。开发后端多年,看到这个还是有点兴奋的。常用mysql的orm框架mybatis、
- 开发环境使用jdk1.8.0_60,把springboot 项目打成war包后,部署到apache-tomcat-7.0.68时报错如下,换
- RecyclerView 已经出来很久了,但是在项目中之前都使用的是ListView,最近新的项目上了都大量的使用了RecycleView.
- 实例如下:import java.lang.reflect.Field;import java.lang.reflect.Invocatio
- 1.vs中生成dll对应的生成dll的cpp如下 #include<opencv2/opencv.hpp>#inclu
- 前言:学习过我的mall项目的应该知道,mall-admin模块是使用SpringSecurity+JWT来实现登录认证的,而mall-po
- 什么是Java类库在编写程序的时候,通常有很多功能是通用的,或者是很基础的,可以用这些功能来组成更发杂的功能代码。比如文件操作,不同程序对文
- 项目里面用到了语音唤醒功能,前面一直在用讯飞的语音识别,本来打算也是直接用讯飞的语音唤醒,但是讯飞的语音唤醒要收费,试用版只有35天有效期。
- 前言最近在做一个公共相关的内容,公告里边的内容,打算做成配置化的。但是考虑到存储到数据库,需要建立数据库表;存储到配置组件中,担心配置组件存
- 相信大部分使用Intellij的同学都会遇到这个问题,即使项目使用了spring-boot-devtools,修改了类或者html、js等,
- 本文实例讲述了C#遍历操作系统下所有驱动器的方法。分享给大家供大家参考。具体分析如下:这里先通过DriveInfo类的GetDrivers方
- 1.aar包是android studio下打包android工程中src、res、lib后生成的aar文件,aar包导入其他android
- 背景kafka有分区机制,一个主题topic在创建的时候,会设置分区。如果只有一个分区,那所有的消费者都订阅的是这一个分区消息;如果有多个分
- Java GC 机制与内存分配策略详解收集算法是内存回收的方 * ,垃圾收集器是内存回收的具体实现自动内存管理解决的是:给对象分配内存 以及
- 本文实例讲述了C#调用VB进行简繁转换的方法。分享给大家供大家参考。具体分析如下:首先在C#项目中引用Microsoft.VisualBas
- 引入线程是为了减少程序在并发执行时所付出的时空开销。属性:轻型实体。它不拥有系统资源,只是有一点必不可少的、能保证独立运行的资源。独立调度和
- JAVA调用webservice,当你刚开始接触的时候你会觉得它是一个恶梦,特别是没有一个统一的标准实现,比起.net的那些几步
- 前言今天我们来讨论一下,程序中的错误处理。在任何一个稳定的程序中,都会有大量的代码在处理错误,有一些业务错误,我们可以通过主动检查判断来规避