JAVA多线程之实现用户任务排队并预估排队时长
作者:洛阳泰山 发布时间:2022-03-26 03:06:20
实现流程
初始化一定数量的任务处理线程和缓存线程池,用户每次调用接口,开启一个线程处理。
假设初始化5个处理器,代码执行 BlockingQueue.take 时候,每次take都会处理器队列就会减少一个,当处理器队列为空时,take就是阻塞线程,当用户处理某某任务完成时候,调用资源释放接口,在处理器队列put 一个处理器对象,原来阻塞的take ,就继续执行。
排队论简介
排队论是研究系统随机聚散现象和随机系统工作工程的数学理论和方法,又称随机服务系统理论,为运筹学的一个分支。我们下面对排队论做下简化处理,先看下图:
代码具体实现
任务队列初始化 TaskQueue
import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 初始化队列及线程池
* @author tarzan
*
*/
@Component
public class TaskQueue {
//处理器队列
public static BlockingQueue<TaskProcessor> taskProcessors;
//等待任务队列
public static BlockingQueue<CompileTask> waitTasks;
//处理任务队列
public static BlockingQueue<CompileTask> executeTasks;
//线程池
public static ExecutorService exec;
//初始处理器数(计算机cpu可用线程数)
public static Integer processorNum=Runtime.getRuntime().availableProcessors();
/**
* 初始化处理器、等待任务、处理任务队列及线程池
*/
@PostConstruct
public static void initEquipmentAndUsersQueue(){
exec = Executors.newCachedThreadPool();
taskProcessors =new LinkedBlockingQueue<TaskProcessor>(processorNum);
//将空闲的设备放入设备队列中
setFreeDevices(processorNum);
waitTasks =new LinkedBlockingQueue<CompileTask>();
executeTasks=new LinkedBlockingQueue<CompileTask>(processorNum);
}
/**
* 将空闲的处理器放入处理器队列中
*/
private static void setFreeDevices(int num) {
//获取可用的设备
for (int i = 0; i < num; i++) {
TaskProcessor dc=new TaskProcessor();
try {
taskProcessors.put(dc);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static CompileTask getWaitTask(Long clazzId) {
return get(TaskQueue.waitTasks,clazzId);
}
public static CompileTask getExecuteTask(Long clazzId) {
return get(TaskQueue.executeTasks,clazzId);
}
private static CompileTask get(BlockingQueue<CompileTask> users, Long clazzId) {
CompileTask compileTask =null;
if (CollectionUtils.isNotEmpty(users)){
Optional<CompileTask> optional=users.stream().filter(e->e.getClazzId().longValue()==clazzId.longValue()).findFirst();
if(optional.isPresent()){
compileTask = optional.get();
}
}
return compileTask;
}
public static Integer getSort(Long clazzId) {
AtomicInteger index = new AtomicInteger(-1);
BlockingQueue<CompileTask> compileTasks = TaskQueue.waitTasks;
if (CollectionUtils.isNotEmpty(compileTasks)){
compileTasks.stream()
.filter(e -> {
index.getAndIncrement();
return e.getClazzId().longValue() == clazzId.longValue();
})
.findFirst();
}
return index.get();
}
//单位秒
public static int estimatedTime(Long clazzId){
return estimatedTime(60,getSort(clazzId)+1);
}
//单位秒
public static int estimatedTime(int cellMs,int num){
int a= (num-1)/processorNum;
int b= cellMs*(a+1);
return b;
}
编译任务类 CompileTask
import lombok.Data;
import org.springblade.core.tool.utils.SpringUtil;
import org.springblade.gis.common.enums.DataScheduleEnum;
import org.springblade.gis.dynamicds.service.DynamicDataSourceService;
import org.springblade.gis.modules.feature.schedule.service.DataScheduleService;
import java.util.Date;
@Data
public class CompileTask implements Runnable {
//当前请求的线程对象
private Long clazzId;
//用户id
private Long userId;
//当前请求的线程对象
private Thread thread;
//绑定处理器
private TaskProcessor taskProcessor;
//任务状态
private Integer status;
//开始时间
private Date startTime;
//结束时间
private Date endTime;
private DataScheduleService dataScheduleService= SpringUtil.getBean(DataScheduleService.class);
private DynamicDataSourceService dataSourceService= SpringUtil.getBean(DynamicDataSourceService.class);
@Override
public void run() {
compile();
}
/**
* 编译
*/
public void compile() {
try {
//取出一个设备
TaskProcessor taskProcessor = TaskQueue.taskProcessors.take();
//取出一个任务
CompileTask compileTask = TaskQueue.waitTasks.take();
//任务和设备绑定
compileTask.setTaskProcessor(taskProcessor);
//放入
TaskQueue.executeTasks.put(compileTask);
System.out.println(DataScheduleEnum.DEAL_WITH.getName()+" "+userId);
//切换用户数据源
dataSourceService.switchDataSource(userId);
//添加进度
dataScheduleService.addSchedule(clazzId, DataScheduleEnum.DEAL_WITH.getState());
} catch (InterruptedException e) {
System.err.println( e.getMessage());
}
}
}
任务处理器 TaskProcessor
import lombok.Data;
import java.util.Date;
@Data
public class TaskProcessor {
/**
* 释放
*/
public static Boolean release(CompileTask task) {
Boolean flag=false;
Thread thread=task.getThread();
synchronized (thread) {
try {
if(null!=task.getTaskProcessor()){
TaskQueue.taskProcessors.put(task.getTaskProcessor());
TaskQueue.executeTasks.remove(task);
task.setEndTime(new Date());
long intervalMilli = task.getEndTime().getTime() - task.getStartTime().getTime();
flag=true;
System.out.println("用户"+task.getClazzId()+"耗时"+intervalMilli+"ms");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
return flag;
}
}
}
Controller控制器接口实现
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springblade.core.tool.api.R;
import org.springblade.gis.multithread.TaskProcessor;
import org.springblade.gis.multithread.TaskQueue;
import org.springblade.gis.multithread.CompileTask;
import org.springframework.web.bind.annotation.*;
import java.util.Date;
@RestController
@RequestMapping("task")
@Api(value = "数据编译任务", tags = "数据编译任务")
public class CompileTaskController {
@ApiOperation(value = "添加等待请求 @author Tarzan Liu")
@PostMapping("compile/{clazzId}")
public R<Integer> compile(@PathVariable("clazzId") Long clazzId) {
CompileTask checkUser=TaskQueue.getWaitTask(clazzId);
if(checkUser!=null){
return R.fail("已经正在排队!");
}
checkUser=TaskQueue.getExecuteTask(clazzId);
if(checkUser!=null){
return R.fail("正在执行编译!");
}
//获取当前的线程
Thread thread=Thread.currentThread();
//创建当前的用户请求对象
CompileTask compileTask =new CompileTask();
compileTask.setThread(thread);
compileTask.setClazzId(clazzId);
compileTask.setStartTime(new Date());
//将当前用户请求对象放入队列中
try {
TaskQueue.waitTasks.put(compileTask);
} catch (InterruptedException e) {
e.printStackTrace();
}
TaskQueue.exec.execute(compileTask);
return R.data(TaskQueue.waitTasks.size()-1);
}
@ApiOperation(value = "查询当前任务前还有多少任务等待 @author Tarzan Liu")
@PostMapping("sort/{clazzId}")
public R<Integer> sort(@PathVariable("clazzId") Long clazzId) {
return R.data(TaskQueue.getSort(clazzId));
}
@ApiOperation(value = "查询当前任务预估时长 @author Tarzan Liu")
@PostMapping("estimate/time/{clazzId}")
public R<Integer> estimatedTime(@PathVariable("clazzId") Long clazzId) {
return R.data(TaskQueue.estimatedTime(clazzId));
}
@ApiOperation(value = "任务释放 @author Tarzan Liu")
@PostMapping("release/{clazzId}")
public R<Boolean> release(@PathVariable("clazzId") Long clazzId) {
CompileTask task=TaskQueue.getExecuteTask(clazzId);
if(task==null){
return R.fail("资源释放异常");
}
return R.status(TaskProcessor.release(task));
}
@ApiOperation(value = "执行 @author Tarzan Liu")
@PostMapping("exec")
public R exec() {
Long start=System.currentTimeMillis();
for (Long i = 1L; i < 100; i++) {
compile(i);
}
System.out.println("消耗时间:"+(System.currentTimeMillis()-start)+"ms");
return R.status(true);
}
}
接口测试
根据任务id查询该任务前还有多少个任务待执行
根据任务id查询该任务预估执行完成的剩余时间,单位秒
补充知识
BlockingQueue
BlockingQueue即阻塞队列,它是基于ReentrantLock,依据它的基本原理,我们可以实现Web中的长连接聊天功能,当然其最常用的还是用于实现生产者与消费者模式,大致如下图所示:
在Java中,BlockingQueue是一个接口,它的实现类有ArrayBlockingQueue、DelayQueue、 LinkedBlockingDeque、LinkedBlockingQueue、PriorityBlockingQueue、SynchronousQueue等,它们的区别主要体现在存储结构上或对元素操作上的不同,但是对于take与put操作的原理,却是类似的。
阻塞与非阻塞
入队
offer(E e):如果队列没满,立即返回true; 如果队列满了,立即返回false-->不阻塞
put(E e):如果队列满了,一直阻塞,直到队列不满了或者线程被中断-->阻塞
offer(E e, long timeout, TimeUnit unit):在队尾插入一个元素,,如果队列已满,则进入等待,直到出现以下三种情况:-->阻塞
被唤醒
等待时间超时
当前线程被中断
出队
poll():如果没有元素,直接返回null;如果有元素,出队
take():如果队列空了,一直阻塞,直到队列不为空或者线程被中断-->阻塞
poll(long timeout, TimeUnit unit):如果队列不空,出队;如果队列已空且已经超时,返回null;如果队列已空且时间未超时,则进入等待,直到出现以下三种情况:
被唤醒
等待时间超时
当前线程被中断
来源:https://blog.csdn.net/weixin_40986713/article/details/121791751
猜你喜欢
- 一、ArrayList是什么ArrayList 类是一个可以动态修改的数组,与普通数组的区别就是它是没有固定大小的限制,我们可以添加或删除元
- 引言最近在写一个 Mybatis 代码自动生成插件,用的是Mybatis来扩展,其中有一个需求就是 生成javaMapper文件和 xmlM
- 最近要做一个网站,要求实现验证码程序,经过不断调试,终于成功实现功能。一、验证码生成类生成验证码的话需要用到java的Graphics类库,
- 相信大家肯定都在电商网站买过东西,当我们看中一件喜欢又想买的东西时,这时候你又不想这么快结账,这时候你就可以放入购物车;就像我们平时去超市买
- 概述在一个程序执行的过程中,各条语句的执行顺序对程序的结果是有直接影响的。也就是说,程序的流程对运行结果有直接的影响。所以,我们必须清楚每条
- 1. 引入当我们编写一个类时,其实就是在描述其对象的属性和行为,而并没有产生实质上的对象, 只有通过new关键字才会产生出对象,这时系统才会
- 简介ThreadPoolExecutor是一个实现ExecutorService接口的线程池,ExecutorService是主要用来处理多
- List、Set、Map判断两个对象相等的标准List:通过equals()方法比较返回true即可。HashSet:先比较两个对象hash
- SpringMVC在接收集合请求参数时,需要在Controller方法的集合参数里前添加@RequestBody,而@RequestBody
- 本文实例讲述了java中Object类用法。分享给大家供大家参考。具体如下:1、Object类是所有java类的基类如果在类的声明中未使用e
- “无论是什么类型,所有的数据都是一系列的位,即一系列0和1。变量的含义是通过解释这些数据的方式来传达的。”——这句原话是书上翻译的,不过后一
- * 的实现使用的模式:代理模式。代理模式的作用是:为其他对象提供一种代理以控制对这个对象的访问。类似租房的中介。两种 * :(1)jd
- 什么是ServletContext?根据字面意思即Servlet上下文服务器会为每一个工程创建一个对象,这个对象就是ServletConte
- 本文实例为大家分享了Android TextView实现跑马灯效果的具体代码,供大家参考,具体内容如下当Layout中只有一个TextVie
- 第一种方法:同步代码块:作用:把出现线程安全的核心代码上锁原理:每次只能一个线程进入,执行完毕后自行解锁,其他线程才能进来执行锁对象要求:理
- C# 3.0为你提供了对象集合初始化器:/// <summary>/// 图书类/// </summary>publ
- Java RandomAccessFile 指定位置实现文件读取与写入RandomAccessFile是属于随机读取类,是可以对文件本身的内
- 一般,我们的web应用都是只有在用户登录之后才允许操作的,也就是说我们不允许非登录认证的用户直接访问某些页面或功能菜单项。我还记得很久以前我
- 重复参数 Scala在定义函数时允许指定最后一个参数可以重复(变长参数),从而允许函数调用者使用变长参数列表来调用该函数,Scala中使用“
- 使用idea的file-》settings-》plugins安装maven helper插件失败,安装页面总是提示installed,在in