Java 多线程并发编程提高数据处理效率的详细过程
作者:ReadThroughLife 发布时间:2021-06-29 04:19:39
🎉工作场景中遇到这样一个需求:根据主机的 IP 地址联动更新其他模型的相关信息。需求很简单,只涉及一般的数据库联动查询以及更新操作,然而在编码实现过程中发现,由于主机的数量很多,导致循环遍历查询、更新时花费很长的时间,调用一次接口大概需要 30-40 min 时间才能完成操作。
💡因此,为了有效缩短接口方法的执行时间,便考虑使用多线程并发编程方法,利用多核处理器并行执行的能力,通过异步处理数据的方式,便可以大大缩短执行时间,提高执行效率。
📍这里使用可重用固定线程数的线程池 FixedThreadPool
,并使用 CountDownLatch
并发工具类提供的并发流程控制工具作为配合使用,保证多线程并发编程过程中的正常运行:
首先,通过
Runtime.getRuntime().availableProcessors()
方法获取运行机器的 CPU 线程数,用于后续设置固定线程池的线程数量。其次,判断任务的特性,如果为计算密集型任务则设置线程数为
CPU 线程数+1
,如果为 IO 密集型任务则设置线程数为2 * CPU 线程数
,由于在方法中需要与数据库进行频繁的交互,因此属于 IO 密集型任务。之后,对数据进行分组切割,每个线程处理一个分组的数据,分组的组数与线程数保持一致,并且还要创建计数器对象
CountDownLatch
,调用构造函数,初始化参数值为线程数个数,保证主线程等待所有子线程运行结束后,再进行后续的操作。然后,调用
executorService.execute()
方法,重写run
方法编写业务逻辑与数据处理代码,执行完当前线程后记得将计数器减1操作。最后,当所有子线程执行完成后,关闭线程池。
✨在省略工作场景中的业务逻辑代码后,通用的处理方法示例如下所示:
public ResponseData updateHostDept() {
// ...
List<Map> hostMapList = mongoTemplate.find(query, Map.class, "host");
// split the hostMapList for the following multi-threads task
// return the number of logical CPUs
int processorsNum = Runtime.getRuntime().availableProcessors();
// set the threadNum as 2*(the number of logical CPUs) for handling IO Tasks,
// if Computing Tasks set the threadNum as (the number of logical CPUs) + 1
int threadNum = processorsNum * 2;
// the number of each group data
int eachGroupNum = hostMapList.size() / threadNum;
List<List<Map>> groupList = new ArrayList<>();
for (int i = 0; i < threadNum; i++) {
int start = i * eachGroupNum;
if (i == threadNum - 1) {
int end = mapList.size();
groupList.add(hostMapList.subList(start, end));
} else {
int end = (i+1) * eachGroupNum;
groupList.add(hostMapList.subList(start, end));
}
}
// update data by using multi-threads asynchronously
ExecutorService executorService = Executors.newFixedThreadPool(threadNum/2);
CountDownLatch countDownLatch = new CountDownLatch(threadNum);
for (List<Map> group : groupList) {
executorService.execute(()->{
try {
for (Map map : group) {
// update the data in mongodb
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// let counter minus one
countDownLatch.countDown();
}
});
}
try {
// main thread donnot execute until all child threads finish
countDownLatch.await();
} catch (Exception e) {
e.printStackTrace();
}
// remember to shutdown the threadPool
executorService.shutdown();
return ResponseData.success();
}
🎉那么在使用多线程异步更新的策略后,从当初调用接口所需的大概时间为 30-40 min
下降到了 8-10 min
,大大提高了执行效率。
💡需要注意的是,这里使用的
newFixedThreadPool
创建线程池,它有一个缺陷就是,它的阻塞队列默认是一个 * 队列,默认值为Integer.MAX_VALUE
极有可能会造成 OOM 问题。因此,一般可以使用ThreadPoolExecutor
来创建线程池,自己可以指定等待队列中的线程个数,避免产生 OOM 问题。
public ResponseData updateHostDept() {
// ...
List<Map> hostMapList = mongoTemplate.find(query, Map.class, "host");
// split the hostMapList for the following multi-threads task
// return the number of logical CPUs
int processorsNum = Runtime.getRuntime().availableProcessors();
// set the threadNum as 2*(the number of logical CPUs) for handling IO Tasks,
// if Computing Tasks set the threadNum as (the number of logical CPUs) + 1
int threadNum = processorsNum * 2;
// the number of each group data
int eachGroupNum = hostMapList.size() / threadNum;
List<List<Map>> groupList = new ArrayList<>();
for (int i = 0; i < threadNum; i++) {
int start = i * eachGroupNum;
if (i == threadNum - 1) {
int end = mapList.size();
groupList.add(hostMapList.subList(start, end));
} else {
int end = (i+1) * eachGroupNum;
groupList.add(hostMapList.subList(start, end));
}
}
// update data by using multi-threads asynchronously
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 8, 30L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(100));
CountDownLatch countDownLatch = new CountDownLatch(threadNum);
for (List<Map> group : groupList) {
executor.execute(()->{
try {
for (Map map : group) {
// update the data in mongodb
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// let counter minus one
countDownLatch.countDown();
}
});
}
try {
// main thread donnot execute until all child threads finish
countDownLatch.await();
} catch (Exception e) {
e.printStackTrace();
}
// remember to shutdown the threadPool
executor.shutdown();
return ResponseData.success();
}
在上述的代码中,核心线程数和最大线程数分别为 5 和 8,并没有设置的很大的值,因为如果如果设置的很大,线程间频繁的上下文切换也会增加时间消耗,反而不能最大程度上发挥多线程的优势。至于如何选择合适的参数,需要根据机器的参数以及任务的类型综合考虑决定。
🎉最后补充一点,如果想要通过非编码的方式获取机器的 CPU 线程个数也很简单,windows 系统通过任务管理器,选择 “性能”,便可以查看 CPU 线程个数的情况,如下图所示:
🎉从上图可以看到,我的机器中内核是八个 CPU,但是通过超线程技术一个物理的 CPU 核心可以模拟成两个逻辑 CPU 线程,因此我的机器是支持8核16线程的。
来源:https://blog.csdn.net/weixin_43252521/article/details/127143078


猜你喜欢
- 分类自定义Layout可以分为两种情况。自定义ViewGroup,创造出一些不同于LinearLayout,RelativeLayout等之
- 1、 定义头和根元素部署描述符文件就像所有XML文件一样,必须以一个XML头开始。这个头声明可以使用的XML版本并给出文件的字符编码。DOC
- 本文章向大家介绍JAVA爬取天天基金网数据,主要包括JAVA爬取天天基金网数据使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参
- mybatis #{}无法自动添加引号传入string类型时,无法自动添加引号,导致SQL将值识别为列名,导致SQL失败解决使用map类型代
- 本文实例所述为Android天气预报之解析天气数据的代码,可实现获取HttpGet对象读取天气网站天气数据,并从数据中解析出天气数据,比如温
- 1. Dom概述Dom方式创建XML,应用了标准xml构造器 javax.xml.parsers.DocumentBuilder 来创建 X
- 动态获取对象的性能值,这个在开发过程中经常会遇到,这里我们探讨一下何如高性能的获取属性值。为了对比测试,我们定义一个类Peoplepubli
- 一、背景在通过Runnable接口创建线程时,启动线程需要借助Thread类,这里就涉及到了静态代理模式。二、实例以歌手演出为例,在演出的这
- 最近开发了比较多的接口,因为没有可参考的案例,所以一开始一直按照我的理解进行开发。开发多了发现自己每个结果都写了相同的代码:try() {}
- progressDialog, 它有两个方法dialog.cancel() 和 dialog.dimiss()1. public void
- 今天,简单讲讲android里关于@id和@+id的区别。之前,自己在布局里无论什么情况都使用@+id,可是后来发现有些代码用的是@id,自
- 一、概述UDP和TCP是网络通讯常用的两个传输协议,C#一般可以通过Socket来实现UDP和TCP通讯,由于.NET框架通过UdpClie
- 1.让方法返回多个参数1.1在方法体外定义变量保存结果using System; using System.Collections
- 参数设置java程序启动参数 -D是用来做什么的呢? 官方解释如下:Set a system property value. If valu
- 前台代码: <asp:Button ID="Button1" runat="server" T
- 本文实例为大家分享了C语言实现简单弹跳小球的具体代码,供大家参考,具体内容如下本节利用 printf 函数 实现一个在屏幕上弹跳的小球,内容
- PictureBox 控件可以显示来自位图、图标或者元文件,以及来自增强的元文件、JPEG 或 GIF 文件的图形。如果控件不足以显示整幅图
- 写在前面:可能是临近期末了,各种课程设计接踵而来,最近在csdn上看到2个一样问答,那就是编写一个基于socket的聊天程序,正好最近刚用s
- 前几天客户提需求,对App增加一个功能,这个功能目前市面上已经很常见,那就是应用内切换语言。啥意思,就是 英、中、法、德、日。。。语言随意切
- 本文实例为大家分享了Android弹出菜单效果的具体代码,供大家参考,具体内容如下功能描述:用户单击按钮弹出菜单。当用户选择一个菜单项,会触