Spark调优多线程并行处理任务实现方式
作者:lshan 发布时间:2023-08-21 15:43:53
方式1:
1. 明确 Spark中Job 与 Streaming中 Job 的区别
1.1 Spark Core
一个 RDD DAG Graph 可以生成一个或多个 Job(Action操作)
一个Job可以认为就是会最终输出一个结果RDD的一条由RDD组织而成的计算
Job在spark里应用里是一个被调度的单位
1.2 Streaming
一个 batch 的数据对应一个 DStreamGraph
而一个 DStreamGraph 包含一或多个关于 DStream 的输出操作
每一个输出对应于一个Job,一个 DStreamGraph 对应一个JobSet,里面包含一个或多个Job
2. Streaming Job的并行度
Job的并行度由两个配置决定:
spark.scheduler.mode(FIFO/FAIR)
spark.streaming.concurrentJobs
一个 Batch 可能会有多个 Action 执行,比如注册了多个 Kafka 数据流,每个Action都会产生一个Job
所以一个 Batch 有可能是一批 Job,也就是 JobSet 的概念
这些 Job 由 jobExecutor 依次提交执行
而 JobExecutor 是一个默认池子大小为1的线程池,所以只能执行完一个Job再执行另外一个Job
这里说的池子,大小就是由spark.streaming.concurrentJobs 控制的
concurrentJobs 决定了向 Spark Core 提交Job的并行度
提交一个Job,必须等这个执行完了,才会提交第二个
假设我们把它设置为2,则会并发的把 Job 提交给 Spark Core
Spark 有自己的机制决定如何运行这两个Job,这个机制其实就是FIFO或者FAIR(决定了资源的分配规则)
默认是 FIFO,也就是先进先出,把 concurrentJobs 设置为2,但是如果底层是FIFO,那么会优先执行先提交的Job
虽然如此,如果资源够两个job运行,还是会并行运行两个Job
Spark Streaming 不同Batch任务可以并行计算么 https://developer.aliyun.com/article/73004
conf.setMaster("local[4]")
conf.set("spark.streaming.concurrentJobs", "3") //job 并行对
conf.set("spark.scheduler.mode", "FIFO")
val sc = new StreamingContext(conf, Seconds(5))
你会发现,不同batch的job其实也可以并行运行的,这里需要有几个条件:
有延时发生了,batch无法在本batch完成
concurrentJobs > 1
如果scheduler mode 是FIFO则需要某个Job无法一直消耗掉所有资源
Mode是FAIR则尽力保证你的Job是并行运行的,毫无疑问是可以并行的。
方式2:
场景1:
程序每次处理的数据量是波动的,比如周末比工作日多很多,晚八点比凌晨四点多很多。
一个spark程序处理的时间在1-2小时波动是OK的。而spark streaming程序不可以,如果每次处理的时间是1-10分钟,就很蛋疼。
设置10分钟吧,实际上10分钟的也就那一段高峰时间,如果设置每次是1分钟,很多时候会出现程序处理不过来,排队过多的任务延迟更久,还可能出现程序崩溃的可能。
场景2:
程序需要处理的相似job数随着业务的增长越来越多
我们知道spark的api里无相互依赖的stage是并行处理的,但是job之间是串行处理的。
spark程序通常是离线处理,比如T+1之类的延迟,时间变长是可以容忍的。而spark streaming是准实时的,如果业务增长导致延迟增加就很不合理。
spark虽然是串行执行job,但是是可以把job放到线程池里多线程执行的。如何在一个SparkContext中提交多个任务
DStream.foreachRDD{
rdd =>
//创建线程池
val executors=Executors.newFixedThreadPool(rules.length)
//将规则放入线程池
for( ru <- rules){
val task= executors.submit(new Callable[String] {
override def call(): String ={
//执行规则
runRule(ru,spark)
}
})
}
//每次创建的线程池执行完所有规则后shutdown
executors.shutdown()
}
注意点
1.最后需要executors.shutdown()。
如果是executors.shutdownNow()会发生未执行完的task强制关闭线程。
如果使用executors.awaitTermination()则会发生阻塞,不是我们想要的结果。
如果没有这个shutdowm操作,程序会正常执行,但是长时间会产生大量无用的线程池,因为每次foreachRDD都会创建一个线程池。
2.可不可以将创建线程池放到foreachRDD外面?
不可以,这个关系到对于scala闭包到理解,经测试,第一次或者前几次batch是正常的,后面的batch无线程可用。
3.线程池executor崩溃了就会导致数据丢失
原则上是这样的,但是正常的代码一般不会发生executor崩溃。至少我在使用的时候没遇到过。
来源:https://www.cnblogs.com/lshan/p/13356037.html
猜你喜欢
- 近期用到了一位师兄写的C++程序,总体功能良好。使用不同的数据测试,发现了一个明显的缺点:大数据量下,预处理过程耗时很长。中科院的某计算集群
- 1. 前言老板说,明天甲方要来看产品,你得造点数据,而且数据必须是“真”的,演示效果要好看一些,这样他才会买我们的产品,我好明年给你换个嫂子
- Java实现按行读取大文件String file = "F:" + File.separator + "a.t
- 1 前言任何一门语言都需要基本的流程控制语句,其思想也符合人类判断问题或做事的逻辑过程。什么是流程控制呢?流程就是做一件事情的顺序,或者说是
- 模板消息文档公众号的类型分为服务号、订阅号和企业号,其中服务号和订阅号比较常见。要想实现公众号推动消息给指定的用户,其类型必须为服务号。推送
- 微服务通过Feign调用进行密码安全认证在项目中,微服务之间的通信也是通过Feign代理的HTTP客户端通信,为了保护我们的业务微服务不被其
- 1、 流的继承关系,以及字节流和字符流。2、 节点流FileOutputStream和FileInputStream和处理流Buffered
- 在IntelliJ IDEA 中这个查看一个类也就是当前类的所有继承关系,包括实现的所有的接口和继承的类,这个继承,不仅仅是一级的继承关系,
- 前言static和final是两个我们必须掌握的关键字。不同于其他关键字,他们都有多种用法,而且在一定环境下使用,可以提高程序的运行性能,优
- 目录1. 应用场景1.1. 保障线程安全1.2. 显示传递参数2. 实现原理3. 注意事项ThreadLocal是线程私有的局部变量存储容器
- 项目结构项目路径可以自己定义,只要路径映射正确就可以pom.xml <properties> <spring.versio
- 0x00:前言参考之前的《MyBatis 中 SqlMapConfig 配置文件详解》记了一下 MyBatis 中的核心配置文件各个标签的作
- 1、输出矩形以此矩形案例(4行,9列的矩形)为例public static void main(String[] args) {  
- 这篇文章主要介绍了springboot 定时任务@Scheduled实现解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的
- 本文会介绍从一个最基本的java工程,到Web工程,到集成Spring、SpringMVC、Spring
- IDEA 新手使用手册1 简介IDEA的全称是IntelliJ IDEA,这是一个java编程语言开发的集成环境。IDEA的每一个方面都是为
- 当目标数据库不能直连的,需要一个服务器作为中间跳板的时候,我们需要通过SSH通道连接数据库。ps:使用ssh连接,相当于本地开了个端口去连接
- 前几天在跟公司大佬讨论一个问题时,看到他使用Handler的一种方式,旁边的同事在说:以前不是这么用的啊。这个问题引发了我的好奇,虽然当时翻
- 前段时间写了一篇基于mybatis实现的多数据源博客。感觉不是很好,这次打算加入git,来搭建一个基于Mybatis-Plus的多数据源项目
- 1.获取签名与模板进入阿里云平台,进入短信服务模块,在以下位置添加签名和模板(格式一定按照要求填写 审批的比较严格)2.编写模板与签名的枚举