Spark中的数据读取保存和累加器实例详解
作者:欣xy 发布时间:2022-09-13 19:26:46
数据读取与保存
Text文件
对于 Text文件的读取和保存 ,其语法和实现是最简单的,因此我只是简单叙述一下这部分相关知识点,大家可以结合demo具体分析记忆。
1)基本语法
(1)数据读取:textFile(String)
(2)数据保存:saveAsTextFile(String)
2)实现代码demo如下:
object Operate_Text {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[1]")
//2.创建SparkContext,该对象是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3.1 读取输入文件
val inputRDD: RDD[String] = sc.textFile("input/demo.txt")
//3.2 保存数据
inputRDD.saveAsTextFile("textFile")
//4.关闭连接
sc.stop()
}
}
Sequence文件
SequenceFile文件 是Hadoop中用来存储二进制形式的 key-value对 的一种平面文件(Flat File)。在SparkContext中,可以通过调用 sequenceFile[ keyClass,valueClass ] (path) 来调用。
1)基本语法
(1)数据读取:sequenceFile[ keyClass, valueClass ] (path)
(2)数据保存:saveAsSequenceFile(String)
2)实现代码demo如下:
object Operate_Sequence {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[1]")
//2.创建SparkContext,该对象是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3.1 创建rdd
val dataRDD: RDD[(Int, Int)] = sc.makeRDD(Array((1,2,3),(4,5,6),(7,8,9)))
//3.2 保存数据为SequenceFile
dataRDD.saveAsSequenceFile("seqFile")
//3.3 读取SequenceFile文件
sc.sequenceFile[Int,Int]("seqFile").collect().foreach(println)
//4.关闭连接
sc.stop()
}
}
Object对象文件
对象文件是将对象序列化后保存的文件,采用Hadoop的序列化机制。可以通过 objectFile[ k , v ] (path) 函数接收一个路径,读取对象文件,返回对应的RDD,也可以通过调用 saveAsObjectFile() 实现对对象文件的输出。因为要序列化所以要指定类型。
1)基本语法
(1)数据读取:objectFile[ k , v ] (path)
(2)数据保存:saveAsObjectFile(String)
2)实现代码demo如下:
object Operate_Object {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[1]")
//2.创建SparkContext,该对象是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3.1 创建RDD
val dataRDD: RDD[Int] = sc.makeRDD(Array(1,2,3,4,5,6),2)
//3.2 保存数据
dataRDD.saveAsObjectFile("objFile")
//3.3 读取数据
sc.objectFile[Int]("objFile").collect().foreach(println)
//4.关闭连接
sc.stop()
}
}
累加器
累加器概念
累加器,是一种变量---分布式共享只写变量。仅支持“add”,支持并发,但Executor和Executor之间不能读数据,可实现所有分片处理时更新共享变量的功能。
累加器用来把Executor端变量信息聚合到Driver端。在Driver中定义的一个变量,在Executor端的每个task都会得到这个变量的一份新的副本,每个task更新这些副本的值后,传回Driver端进行合并计算。
系统累加器
1)累加器定义(SparkContext.accumulator(initialValue)方法)
val sum: LongAccumulator = sc.longAccumulator("sum")
2)累加器添加数据(累加器.add方法)
sum.add(count)
3)累加器获取数据(累加器.value)
sum.value
注意:Executor端的任务不能读取累加器的值(例如:在Executor端调用sum.value,获取的值不是累加器最终的值)。因此我们说,累加器是一个分布式共享只写变量。
4)累加器要放在行动算子中
因为转换算子执行的次数取决于job的数量,如果一个 spark应用 有多个行动算子,那么转换算子中的累加器可能会发生不止一次更新,导致结果错误。所以,如果想要一个无论在失败还是重复计算时都绝对可靠的累加器,必须把它放在foreach()这样的行动算子中。
5) 代码实现:
object accumulator_system {
package com.atguigu.cache
import org.apache.spark.rdd.RDD
import org.apache.spark.util.LongAccumulator
import org.apache.spark.{SparkConf, SparkContext}
object accumulator_system {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
val sc = new SparkContext(conf)
val dataRDD: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("a", 2), ("a", 3), ("a", 4)))
//需求:统计a出现的所有次数 ("a",10)
//普通算子实现 reduceByKey 代码会走shuffle 效率低
val rdd: RDD[(String, Int)] = dataRDD.reduceByKey(_ + _)
//累加器实现
//1 声明累加器
val accSum: LongAccumulator = sc.longAccumulator("sum")
dataRDD.foreach{
case (a,count) => {
//2 使用累加器累加 累加器.add()
accSum.add(count)
// 4 不在executor端获取累加器的值,因为得到的值不准确,所以累加器叫分布式共享只写变量
//println("sum = " + accSum.value)
}
}
//3 获取累加器的值 累加器.value
println(("a",accSum.value))
sc.stop()
}
}
来源:https://juejin.cn/post/7159579713738899486


猜你喜欢
- 一、前提 这里的原则只是针对MySQL数据库,其他的数据库某些是殊途同归,某些还是存在差异。我总结的也是MySQL普遍的规则,对于某些特殊情
- css加载器在webpack中,所有的资源(js文件、css文件、模板文件,图片文件等等)都被看成是一个模块,因此多有的资源都是可以被加载的
- 今天在数据分析时遇到了一个小问题,这时才发现自己的基础知识真的不牢固,所以这里记录一下解决方法问题:我在处理完数据后得到的是一个列表,其中放
- 前言在进行一个应用系统的开发过程中,从上到下一般需要四个构件:客户端-业务逻辑层-数据访问层-数据库,其中数据访问层是一个底层、核心的技术。
- 这段时间应老师的要求,给实验室写了一个基于 PyQt5 的小工具。然而源码发过去人家还不要,一定要打包成可执行软件。那就打包呗,刚好以前对
- 一个不错的二级联动下拉菜单源码,您一定会用得到的。运行代码:<html><head><title>Lis
- 从 Google 的一个细节说起:整个虚线框都是“Next”的可点击区域。看似不经意,却直接提升了细节的可用性。其它页码也巧妙地和上面的字母
- 环境准备Python3.6pip install Django==2.0.1pip install celery==4.1.0pip ins
- Embedding的近邻搜索是当前图推荐系统非常重要的一种召回方式,通过item2vec、矩阵分解、双塔DNN等方式都能够产出训练好的use
- 摘要: 三次握手,四次挥手意思是tcp建立连接时需要三次交互来完成,A发起连接A --- SYN --> BA
- 利用字典dict来完成统计举例:a = [1, 2, 3, 1, 1, 2]dict = {}for key in a: dic
- JavaScript 学习 - 提高篇一. JavaScript中的对象.JavaScript中的Object是一组数据的key-value
- python中进行图表绘制的库主要有两个:matplotlib 和 pyecharts, 相比较而言:matplotlib中提供了BaseM
- APScheduler简介在平常的工作中几乎有一半的功能模块都需要定时任务来推动,例如项目中有一个定时统计程序,定时爬出网站的URL程序,定
- 目录概述针对同一类型问题的多种处理方式一、不使用策略模式二、策略模式UML总结示例概述定义一系列算法,将每个算法封装起来。并让它们能够相互替
- 先来看一下该方法的说明create_image(position, **options) [#]Draws an image on the
- 要求:#出租车计费*************************************************************
- 0. 前言深度学习已经成为机器学习中最受欢迎和发展最快的领域。自 2012 年深度学习性能超越机器学习等传统方法以来,深度学习架构开始快速应
- 1 np.arange(),类似于range,通过指定开始值,终值和步长来创建表示等差数列的一维数组,注意该函数和range一样结果不包含终
- 在用python进行图像处理时,有时需要遍历numpy数组,下面是遍历数组的方法:[rows, cols] = num.shape for