Spark调优多线程并行处理任务实现方式
作者:lshan 发布时间:2023-08-21 15:43:53
方式1:
1. 明确 Spark中Job 与 Streaming中 Job 的区别
1.1 Spark Core
一个 RDD DAG Graph 可以生成一个或多个 Job(Action操作)
一个Job可以认为就是会最终输出一个结果RDD的一条由RDD组织而成的计算
Job在spark里应用里是一个被调度的单位
1.2 Streaming
一个 batch 的数据对应一个 DStreamGraph
而一个 DStreamGraph 包含一或多个关于 DStream 的输出操作
每一个输出对应于一个Job,一个 DStreamGraph 对应一个JobSet,里面包含一个或多个Job
2. Streaming Job的并行度
Job的并行度由两个配置决定:
spark.scheduler.mode(FIFO/FAIR)
spark.streaming.concurrentJobs
一个 Batch 可能会有多个 Action 执行,比如注册了多个 Kafka 数据流,每个Action都会产生一个Job
所以一个 Batch 有可能是一批 Job,也就是 JobSet 的概念
这些 Job 由 jobExecutor 依次提交执行
而 JobExecutor 是一个默认池子大小为1的线程池,所以只能执行完一个Job再执行另外一个Job
这里说的池子,大小就是由spark.streaming.concurrentJobs 控制的
concurrentJobs 决定了向 Spark Core 提交Job的并行度
提交一个Job,必须等这个执行完了,才会提交第二个
假设我们把它设置为2,则会并发的把 Job 提交给 Spark Core
Spark 有自己的机制决定如何运行这两个Job,这个机制其实就是FIFO或者FAIR(决定了资源的分配规则)
默认是 FIFO,也就是先进先出,把 concurrentJobs 设置为2,但是如果底层是FIFO,那么会优先执行先提交的Job
虽然如此,如果资源够两个job运行,还是会并行运行两个Job
Spark Streaming 不同Batch任务可以并行计算么 https://developer.aliyun.com/article/73004
conf.setMaster("local[4]")
conf.set("spark.streaming.concurrentJobs", "3") //job 并行对
conf.set("spark.scheduler.mode", "FIFO")
val sc = new StreamingContext(conf, Seconds(5))
你会发现,不同batch的job其实也可以并行运行的,这里需要有几个条件:
有延时发生了,batch无法在本batch完成
concurrentJobs > 1
如果scheduler mode 是FIFO则需要某个Job无法一直消耗掉所有资源
Mode是FAIR则尽力保证你的Job是并行运行的,毫无疑问是可以并行的。
方式2:
场景1:
程序每次处理的数据量是波动的,比如周末比工作日多很多,晚八点比凌晨四点多很多。
一个spark程序处理的时间在1-2小时波动是OK的。而spark streaming程序不可以,如果每次处理的时间是1-10分钟,就很蛋疼。
设置10分钟吧,实际上10分钟的也就那一段高峰时间,如果设置每次是1分钟,很多时候会出现程序处理不过来,排队过多的任务延迟更久,还可能出现程序崩溃的可能。
场景2:
程序需要处理的相似job数随着业务的增长越来越多
我们知道spark的api里无相互依赖的stage是并行处理的,但是job之间是串行处理的。
spark程序通常是离线处理,比如T+1之类的延迟,时间变长是可以容忍的。而spark streaming是准实时的,如果业务增长导致延迟增加就很不合理。
spark虽然是串行执行job,但是是可以把job放到线程池里多线程执行的。如何在一个SparkContext中提交多个任务
DStream.foreachRDD{
rdd =>
//创建线程池
val executors=Executors.newFixedThreadPool(rules.length)
//将规则放入线程池
for( ru <- rules){
val task= executors.submit(new Callable[String] {
override def call(): String ={
//执行规则
runRule(ru,spark)
}
})
}
//每次创建的线程池执行完所有规则后shutdown
executors.shutdown()
}
注意点
1.最后需要executors.shutdown()。
如果是executors.shutdownNow()会发生未执行完的task强制关闭线程。
如果使用executors.awaitTermination()则会发生阻塞,不是我们想要的结果。
如果没有这个shutdowm操作,程序会正常执行,但是长时间会产生大量无用的线程池,因为每次foreachRDD都会创建一个线程池。
2.可不可以将创建线程池放到foreachRDD外面?
不可以,这个关系到对于scala闭包到理解,经测试,第一次或者前几次batch是正常的,后面的batch无线程可用。
3.线程池executor崩溃了就会导致数据丢失
原则上是这样的,但是正常的代码一般不会发生executor崩溃。至少我在使用的时候没遇到过。
来源:https://www.cnblogs.com/lshan/p/13356037.html


猜你喜欢
- Java语言的关键字,当它用来修饰一个方法或者一个代码块的时候,能够保证在同一时刻最多只有一个线程执行该段代码。一、当两个并发线程访问同一个
- 1. 概述JDK * 是利用java反射机制 生成一个实现接口的匿名类, 在调用具体方法前调用InvocationHandler来处理Cg
- 线程可以划分优先级,优先级高的线程得到的CPU资源比较多,也就是CPU优先执行优先级高的线程对象中的任务。设置线程优先级有助于帮助线程规划器
- TreeMap 的实现就是红黑树数据结构,也就说是一棵自平衡的排序二叉树,这样就可以保证当需要快速检索指定节点。TreeSet 和 Tree
- 最近因为工作的原因用到了西门子PLC,在使用过程中一直在思考上位机和PLC的通讯问题,后来上网查了一下,找到了一个专门针对S7开发的一个.n
- 单例模式单例模式顾名思义就是单一的实例,涉及到一个单一的类,该类负责创建自己的对象,同时确保只有一个对象被创建,并且提供一种可以访问这个对象
- 本文实例为大家分享了flutter实现不规则底部导航栏的具体代码,供大家参考,具体内容如下实现底部导航栏并点击切换页面可简述为有三种方式Ta
- 什么是FlutterFlutter 是谷歌推出的开发移动UI框架,可以快速的在IOS和Android上构建高质量的原生用户界面。Flutte
- 一、springboot 自动配置原理先说说我们自己的应用程序中Bean加入容器的办法:package com.ynunicom.dc.di
- 单独一个变量直接使用 @a 的形式,无需加分号,一般是直接使用已有变量,注意在使用 html 标签时
- 关于tomcat热部署on ‘update' action:【update】时,执行的操作。on frame deactivatio
- 一、前言系统执行业务逻辑之前,会对输入数据进行校验,检测数据是否有效合法的。所以我们可能会写大量的if else等判断逻辑,特别是在不同方法
- 关于在Android中实现ListView的弹性效果,有很多不同的方法,网上一搜,也有很多,下面贴出在项目中经常用到的两种实现ListVie
- 本文实例讲述了C#判断一天、一年已经过了百分之多少的方法。分享给大家供大家参考。具体如下:这里写了四个函数,分别是1.判断当前时间过了今天的
- ArrayList实现班级信息管理系统,供大家参考,具体内容如下代码如下:import java.util.*;public class D
- 前言如果你了解过 Liunx ,了解过 Liunx 的中管道命令 | ,那么你会发现,其实 Java 8 的 stream 和 Liunx
- springboot项目出现”java: 错误: 无效的源发行版:17“问题解决方案下面是报错页面问
- RequestBody注解的List参数传递Controller方法参数:@RequestBody List<Long> ids
- Java Double相加出现的怪事问题的提出编译运行下面这个程序会看到什么public class test { public stati
- Jenkins是一个java开发的、开源的、非常好用持续集成的工具,它能帮我们实现自动化部署环境、测试、打包等等的工作,还可以在构建任务成功