Java如何处理延迟任务过程解析
作者:巡山小妖N 发布时间:2022-04-29 15:11:37
1、利用延迟队列
延时队列,第一他是个队列,所以具有对列功能第二就是延时,这就是延时对列,功能也就是将任务放在该延时对列中,只有到了延时时刻才能从该延时对列中获取任务否则获取不到……
应用场景比较多,比如延时1分钟发短信,延时1分钟再次执行等,下面先看看延时队列demo之后再看延时队列在项目中的使用:
简单的延时队列要有三部分:第一实现了Delayed接口的消息体、第二消费消息的消费者、第三存放消息的延时队列,那下面就来看看延时队列demo。
一、消息体
package com.delqueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
/**
* 消息体定义 实现Delayed接口就是实现两个方法即compareTo 和 getDelay最重要的就是getDelay方法,这个方法用来判断是否到期…… */
public class Message implements Delayed {
private int id;
private String body; // 消息内容
private long excuteTime;// 延迟时长,这个是必须的属性因为要按照这个判断延时时长。
public int getId() {
return id;
}
public String getBody() {
return body;
}
public long getExcuteTime() {
return excuteTime;
}
public Message(int id, String body, long delayTime) {
this.id = id;
this.body = body;
this.excuteTime = TimeUnit.NANOSECONDS.convert(delayTime, TimeUnit.MILLISECONDS) + System.nanoTime();
}
// 自定义实现比较方法返回 1 0 -1三个参数
@Override
public int compareTo(Delayed delayed) {
Message msg = (Message) delayed;
return Integer.valueOf(this.id) > Integer.valueOf(msg.id) ? 1
: (Integer.valueOf(this.id) < Integer.valueOf(msg.id) ? -1 : 0);
}
// 延迟任务是否到时就是按照这个方法判断如果返回的是负数则说明到期否则还没到期
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(this.excuteTime - System.nanoTime(), TimeUnit.NANOSECONDS);
}
}
二、消息消费者
package com.delqueue;
import java.util.concurrent.DelayQueue;
public class Consumer implements Runnable {
// 延时队列 ,消费者从其中获取消息进行消费
private DelayQueue<Message> queue;
public Consumer(DelayQueue<Message> queue) {
this.queue = queue;
}
@Override
public void run() {
while (true) {
try {
Message take = queue.take();
System.out.println("消费消息id:" + take.getId() + " 消息体:" + take.getBody());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
三、延时队列
package com.delqueue;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class DelayQueueTest {
public static void main(String[] args) {
// 创建延时队列
DelayQueue<Message> queue = new DelayQueue<Message>();
// 添加延时消息,m1 延时3s
Message m1 = new Message(1, "world", 3000);
// 添加延时消息,m2 延时10s
Message m2 = new Message(2, "hello", 10000);
//将延时消息放到延时队列中
queue.offer(m2);
queue.offer(m1);
// 启动消费线程 消费添加到延时队列中的消息,前提是任务到了延期时间
ExecutorService exec = Executors.newFixedThreadPool(1);
exec.execute(new Consumer(queue));
exec.shutdown();
}
}
将消息体放入延迟队列中,在启动消费者线程去消费延迟队列中的消息,如果延迟队列中的消息到了延迟时间则可以从中取出消息否则无法取出消息也就无法消费。
这就是延迟队列demo,下面我们来说说在真实环境下的使用。
使用场景描述:
在打车软件中对订单进行派单的流程,当有订单的时候给该订单筛选司机,然后给当订单绑定司机,但是有时运气没那么好,订单进来后第一次没有筛选到合适的司机,但我们也不能就此结束派单,而是将该订单的信息放到延时队列中过个2秒钟在进行一次,其实这个2秒钟就是一个延迟,所以这里我们就可以使用延时队列来实现……
下面看看简单的流程图:
下面来看看具体代码实现:
在项目中有如下几个类:第一 、任务类 第二、按照任务类组装的消息体类 第三、延迟队列管理类
任务类即执行筛选司机、绑单、push消息的任务类
package com.test.delayqueue;
/**
* 具体执行相关业务的业务类
* @author whd
* @date 2017年9月25日 上午12:49:32
*/
public class DelayOrderWorker implements Runnable {
@Override
public void run() {
// TODO Auto-generated method stub
//相关业务逻辑处理
System.out.println(Thread.currentThread().getName()+" do something ……");
}
}
消息体类,在延时队列中这个实现了Delayed接口的消息类是比不可少的,实现接口时有一个getDelay(TimeUnit unit)方法,这个方法就是判断是否到期的
这里定义的是一个泛型类,所以可以将我们上面的任务类作为其中的task,这样就将任务类分装成了一个消息体
package com.test.delayqueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
/**
* 延时队列中的消息体将任务封装为消息体
*
* @author whd
* @date 2017年9月25日 上午12:48:30
* @param <T>
*/
public class DelayOrderTask<T extends Runnable> implements Delayed {
private final long time;
private final T task; // 任务类,也就是之前定义的任务类
/**
* @param timeout
* 超时时间(秒)
* @param task
* 任务
*/
public DelayOrderTask(long timeout, T task) {
this.time = System.nanoTime() + timeout;
this.task = task;
}
@Override
public int compareTo(Delayed o) {
// TODO Auto-generated method stub
DelayOrderTask other = (DelayOrderTask) o;
long diff = time - other.time;
if (diff > 0) {
return 1;
} else if (diff < 0) {
return -1;
} else {
return 0;
}
}
@Override
public long getDelay(TimeUnit unit) {
// TODO Auto-generated method stub
return unit.convert(this.time - System.nanoTime(), TimeUnit.NANOSECONDS);
}
@Override
public int hashCode() {
return task.hashCode();
}
public T getTask() {
return task;
}
}
延时队列管理类,这个类主要就是将任务类封装成消息并并添加到延时队列中,以及轮询延时队列从中取出到时的消息体,在获取任务类放到线程池中执行任务
package com.test.delayqueue;
import java.util.Map;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
/**
* 延时队列管理类,用来添加任务、执行任务
*
* @author whd
* @date 2017年9月25日 上午12:44:59
*/
public class DelayOrderQueueManager {
private final static int DEFAULT_THREAD_NUM = 5;
private static int thread_num = DEFAULT_THREAD_NUM;
// 固定大小线程池
private ExecutorService executor;
// 守护线程
private Thread daemonThread;
// 延时队列
private DelayQueue<DelayOrderTask<?>> delayQueue;
private static final AtomicLong atomic = new AtomicLong(0);
private final long n = 1;
private static DelayOrderQueueManager instance = new DelayOrderQueueManager();
private DelayOrderQueueManager() {
executor = Executors.newFixedThreadPool(thread_num);
delayQueue = new DelayQueue<>();
init();
}
public static DelayOrderQueueManager getInstance() {
return instance;
}
/**
* 初始化
*/
public void init() {
daemonThread = new Thread(() -> {
execute();
});
daemonThread.setName("DelayQueueMonitor");
daemonThread.start();
}
private void execute() {
while (true) {
Map<Thread, StackTraceElement[]> map = Thread.getAllStackTraces();
System.out.println("当前存活线程数量:" + map.size());
int taskNum = delayQueue.size();
System.out.println("当前延时任务数量:" + taskNum);
try {
// 从延时队列中获取任务
DelayOrderTask<?> delayOrderTask = delayQueue.take();
if (delayOrderTask != null) {
Runnable task = delayOrderTask.getTask();
if (null == task) {
continue;
}
// 提交到线程池执行task
executor.execute(task);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* 添加任务
*
* @param task
* @param time
* 延时时间
* @param unit
* 时间单位
*/
public void put(Runnable task, long time, TimeUnit unit) {
// 获取延时时间
long timeout = TimeUnit.NANOSECONDS.convert(time, unit);
// 将任务封装成实现Delayed接口的消息体
DelayOrderTask<?> delayOrder = new DelayOrderTask<>(timeout, task);
// 将消息体放到延时队列中
delayQueue.put(delayOrder);
}
/**
* 删除任务
*
* @param task
* @return
*/
public boolean removeTask(DelayOrderTask task) {
return delayQueue.remove(task);
}
}
测试类
package com.delqueue;
import java.util.concurrent.TimeUnit;
import com.test.delayqueue.DelayOrderQueueManager;
import com.test.delayqueue.DelayOrderWorker;
public class Test {
public static void main(String[] args) {
DelayOrderWorker work1 = new DelayOrderWorker();// 任务1
DelayOrderWorker work2 = new DelayOrderWorker();// 任务2
DelayOrderWorker work3 = new DelayOrderWorker();// 任务3
// 延迟队列管理类,将任务转化消息体并将消息体放入延迟对列中等待执行
DelayOrderQueueManager manager = DelayOrderQueueManager.getInstance();
manager.put(work1, 3000, TimeUnit.MILLISECONDS);
manager.put(work2, 6000, TimeUnit.MILLISECONDS);
manager.put(work3, 9000, TimeUnit.MILLISECONDS);
}
}
OK 这就是项目中的具体使用情况,当然具体内容被忽略,整体框架就是这样,还有这里使用java的延时队列但是这种方式是有问题的如果如果down机则会出现任务丢失,所以也可以考虑使用mq、redis来实现
2、mq实现延迟消息
在rabbitmq 3.5.7及以上的版本提供了一个插件(rabbitmq-delayed-message-exchange)来实现延迟队列功能。同时插件依赖Erlang/OPT 18.0及以上。
插件源码地址:
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
插件下载地址:
https://bintray.com/rabbitmq/community-plugins/rabbitmq_delayed_message_exchange
安装:
进入插件安装目录
{rabbitmq-server}/plugins/(可以查看一下当前已存在的插件)
下载插件
rabbitmq_delayed_message_exchange
wget https://bintray.com/rabbitmq/community-plugins/download_file?file_path=rabbitmq_delayed_message_exchange-0.0.1.ez
(如果下载的文件名称不规则就手动重命名一下如:
rabbitmq_delayed_message_exchange-0.0.1.ez)
启用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
关闭插件
rabbitmq-plugins disable rabbitmq_delayed_message_exchange
插件使用
通过声明一个x-delayed-message类型的exchange来使用delayed-messaging特性
x-delayed-message是插件提供的类型,并不是rabbitmq本身的,发送消息的时候通过在header添加”x-delay”参数来控制消息的延时时间
直接在maven工程的pom.xml文件中加入
接下来在 application.properties 文件中加入redis配置:
也很简单,代码如下:
实现消息发送
x-delay
在这里我设置的延迟时间是3秒。
消息消费者
直接在main方法里运行Spring Boot程序,Spring Boot会自动解析 MessageReceiver 类的。
接下来只需要用Junit运行一下发送消息的接口即可。
运行完后,可以看到如下信息:
消息发送时间:2018-05-03 12:44:53
3秒钟后,Spring Boot控制台会输出:
消息接收时间:2018-05-03 12:44:56
接收到的消息:hello i am delay msg
来源:https://www.cnblogs.com/nxzblogs/p/11666877.html
猜你喜欢
- 本文实例讲述了C#将制定目录文件名转换成大写的方法。分享给大家供大家参考。具体如下:using System;using System.IO
- 前言最近在研究串口通讯,其中有几个比较重要的概念,RS-232这种适配于上位机和PC端进行连接,RS-232只限于PC串口和设备间点对点的通
- 本文实例讲述了Java之JFrame输出Helloworld的方法。分享给大家供大家参考。具体如下:JAVA的GUI程序的基本思路是以JFr
- JAVA用于开发图形界面应用的 SWING 组件包功能强大,使用方便。接下来我们就使用其写一个简单的图形界面小程序:加法计算器。第一步:首先
- 一、前言1、热更新代码的场景(1)当线上服务器出现问题时,有些时候现有的手段不足以发现问题所在,可能需要追加打印日志或者增加一些调试代码,如
- 在J2EE项目的开发中,不管是对底层的数据库操作过程,还是业务层的处理过程,还是控制层的处理过程,都不可避免会遇到各种可预知的、不可预知的异
- SpringCloudStream配置以下配置摘自《SpringCloud微服务实战》,配置主要包括两大部分:Stream配置(基础配置、通
- 在java里, 我们可以使用Executors.newFixedThreadPool 来创建线程池, 然后就可以不停的创建新任务,并用线程池
- 初看 cgaolei 翻译的 Java技巧之双括弧初始化 一文,走马观花,只知用法,未细看后面的解释。蔚为惊艳,心里想 Java 竟然有这么
- 增加了用于处理MyBatis的两个bean:SqlSessionFactoryBean、MapperFactoryBean1、注册SqlSe
- 本文实例为大家分享了java连接SQL Server数据库的具体代码,供大家参考,具体内容如下操作系统:windows 10 64位java
- 查询文档 & 基本操作为了方便学习, 本节中所有示例沿用上节的索引按照ID单个GET class_1/_doc/1查询结果:{ &n
- 现象: 1. 表面现象: 方法中输出的日志, 日志文件中找不到, 也没有任何报错(即@Async标注的方法没有执行, 也没有报错)2. 分析
- 0.前言HashMap简述:HashMap 基于哈希表的 Map 接口实现,是以 key-value 存储形式存在,即主要用来存放键值对。H
- idea中ssm框架的编码问题介绍在idea中编码问题分为几个部分:1 tomcat服务器编码2 页面编码3 控制台编码4 操作系统编码在实
- 对象的读写使用ObjectInputStream和ObjectOutputStream读写对象(序列化与反序列化)。只有字节流没有字符流.类
- 前几天在这里分享了手写 sql 分页查询实现分页,现在来看看使用 mybatis 分页插件 pagehepler 来实现分页使用分页插件的原
- 1. 问题描述:自己修改了下 ${M2_HOME}/conf/settings.xml中的本地repository地址,但是重新执行mvn的
- 前言任何一个服务如果没有监控,那就是两眼一抹黑,无法知道当前服务的运行情况,也就无法对可能出现的异常状况进行很好的处理,所以对任意一个服务来
- Spring Cloud Gateway去掉url前缀主要是增加一个 route,其他配置不变routes: - id: ser