java简单手写版本实现时间轮算法
作者:扶苏l 发布时间:2023-01-05 14:50:38
标签:java,时间轮
时间轮
关于时间轮的介绍,网上有很多,这里就不重复了
核心思想
一个环形数组存储时间轮的所有槽(看你的手表),每个槽对应当前时间轮的最小精度
超过当前时间轮最大表示范围的会被丢到上层时间轮,上层时间轮的最小精度即为下层时间轮能表达的最大时间(时分秒概念)
每个槽对应一个环形链表存储该时间应该被执行的任务
需要一个线程去驱动指针运转,获取到期任务
以下给出java 简单手写版本实现
代码实现
时间轮主数据结构
/**
* @author apdoer
* @version 1.0
* @date 2021/3/22 19:31
*/
@Slf4j
public class TimeWheel {
/**
* 一个槽的时间间隔(时间轮最小刻度)
*/
private long tickMs;
/**
* 时间 * 小(槽的个数)
*/
private int wheelSize;
/**
* 一轮的时间跨度
*/
private long interval;
private long currentTime;
/**
* 槽
*/
private TimerTaskList[] buckets;
/**
* 上层时间轮
*/
private volatile TimeWheel overflowWheel;
/**
* 一个timer只有一个delayqueue
*/
private DelayQueue<TimerTaskList> delayQueue;
public TimeWheel(long tickMs, int wheelSize, long currentTime, DelayQueue<TimerTaskList> delayQueue) {
this.currentTime = currentTime;
this.tickMs = tickMs;
this.wheelSize = wheelSize;
this.interval = tickMs * wheelSize;
this.buckets = new TimerTaskList[wheelSize];
this.currentTime = currentTime - (currentTime % tickMs);
this.delayQueue = delayQueue;
for (int i = 0; i < wheelSize; i++) {
buckets[i] = new TimerTaskList();
}
}
public boolean add(TimerTaskEntry entry) {
long expiration = entry.getExpireMs();
if (expiration < tickMs + currentTime) {
//到期了
return false;
} else if (expiration < currentTime + interval) {
//扔进当前时间轮的某个槽里,只有时间大于某个槽,才会放进去
long virtualId = (expiration / tickMs);
int index = (int) (virtualId % wheelSize);
TimerTaskList bucket = buckets[index];
bucket.addTask(entry);
//设置bucket 过期时间
if (bucket.setExpiration(virtualId * tickMs)) {
//设好过期时间的bucket需要入队
delayQueue.offer(bucket);
return true;
}
} else {
//当前轮不能满足,需要扔到上一轮
TimeWheel timeWheel = getOverflowWheel();
return timeWheel.add(entry);
}
return false;
}
private TimeWheel getOverflowWheel() {
if (overflowWheel == null) {
synchronized (this) {
if (overflowWheel == null) {
overflowWheel = new TimeWheel(interval, wheelSize, currentTime, delayQueue);
}
}
}
return overflowWheel;
}
/**
* 推进指针
*
* @param timestamp
*/
public void advanceLock(long timestamp) {
if (timestamp > currentTime + tickMs) {
currentTime = timestamp - (timestamp % tickMs);
if (overflowWheel != null) {
this.getOverflowWheel().advanceLock(timestamp);
}
}
}
}
定时器接口
/**
* 定时器
* @author apdoer
* @version 1.0
* @date 2021/3/22 20:30
*/
public interface Timer {
/**
* 添加一个新任务
*
* @param timerTask
*/
void add(TimerTask timerTask);
/**
* 推动指针
*
* @param timeout
*/
void advanceClock(long timeout);
/**
* 等待执行的任务
*
* @return
*/
int size();
/**
* 关闭服务,剩下的无法被执行
*/
void shutdown();
}
定时器实现
/**
* @author apdoer
* @version 1.0
* @date 2021/3/22 20:33
*/
@Slf4j
public class SystemTimer implements Timer {
/**
* 底层时间轮
*/
private TimeWheel timeWheel;
/**
* 一个Timer只有一个延时队列
*/
private DelayQueue<TimerTaskList> delayQueue = new DelayQueue<>();
/**
* 过期任务执行线程
*/
private ExecutorService workerThreadPool;
/**
* 轮询delayQueue获取过期任务线程
*/
private ExecutorService bossThreadPool;
public SystemTimer() {
this.timeWheel = new TimeWheel(1, 20, System.currentTimeMillis(), delayQueue);
this.workerThreadPool = Executors.newFixedThreadPool(100);
this.bossThreadPool = Executors.newFixedThreadPool(1);
//20ms推动一次时间轮运转
this.bossThreadPool.submit(() -> {
for (; ; ) {
this.advanceClock(20);
}
});
}
public void addTimerTaskEntry(TimerTaskEntry entry) {
if (!timeWheel.add(entry)) {
//已经过期了
TimerTask timerTask = entry.getTimerTask();
log.info("=====任务:{} 已到期,准备执行============",timerTask.getDesc());
workerThreadPool.submit(timerTask);
}
}
@Override
public void add(TimerTask timerTask) {
log.info("=======添加任务开始====task:{}", timerTask.getDesc());
TimerTaskEntry entry = new TimerTaskEntry(timerTask, timerTask.getDelayMs() + System.currentTimeMillis());
timerTask.setTimerTaskEntry(entry);
addTimerTaskEntry(entry);
}
/**
* 推动指针运转获取过期任务
*
* @param timeout 时间间隔
* @return
*/
@Override
public synchronized void advanceClock(long timeout) {
try {
TimerTaskList bucket = delayQueue.poll(timeout, TimeUnit.MILLISECONDS);
if (bucket != null) {
//推进时间
timeWheel.advanceLock(bucket.getExpiration());
//执行过期任务(包含降级)
bucket.clear(this::addTimerTaskEntry);
}
} catch (InterruptedException e) {
log.error("advanceClock error");
}
}
@Override
public int size() {
//todo
return 0;
}
@Override
public void shutdown() {
this.bossThreadPool.shutdown();
this.workerThreadPool.shutdown();
this.timeWheel = null;
}
}
存储任务的环形链表
/**
* @author apdoer
* @version 1.0
* @date 2021/3/22 19:26
*/
@Data
@Slf4j
class TimerTaskList implements Delayed {
/**
* TimerTaskList 环形链表使用一个虚拟根节点root
*/
private TimerTaskEntry root = new TimerTaskEntry(null, -1);
{
root.next = root;
root.prev = root;
}
/**
* bucket的过期时间
*/
private AtomicLong expiration = new AtomicLong(-1L);
public long getExpiration() {
return expiration.get();
}
/**
* 设置bucket的过期时间,设置成功返回true
*
* @param expirationMs
* @return
*/
boolean setExpiration(long expirationMs) {
return expiration.getAndSet(expirationMs) != expirationMs;
}
public boolean addTask(TimerTaskEntry entry) {
boolean done = false;
while (!done) {
//如果TimerTaskEntry已经在别的list中就先移除,同步代码块外面移除,避免死锁,一直到成功为止
entry.remove();
synchronized (this) {
if (entry.timedTaskList == null) {
//加到链表的末尾
entry.timedTaskList = this;
TimerTaskEntry tail = root.prev;
entry.prev = tail;
entry.next = root;
tail.next = entry;
root.prev = entry;
done = true;
}
}
}
return true;
}
/**
* 从 TimedTaskList 移除指定的 timerTaskEntry
*
* @param entry
*/
public void remove(TimerTaskEntry entry) {
synchronized (this) {
if (entry.getTimedTaskList().equals(this)) {
entry.next.prev = entry.prev;
entry.prev.next = entry.next;
entry.next = null;
entry.prev = null;
entry.timedTaskList = null;
}
}
}
/**
* 移除所有
*/
public synchronized void clear(Consumer<TimerTaskEntry> entry) {
TimerTaskEntry head = root.next;
while (!head.equals(root)) {
remove(head);
entry.accept(head);
head = root.next;
}
expiration.set(-1L);
}
@Override
public long getDelay(TimeUnit unit) {
return Math.max(0, unit.convert(expiration.get() - System.currentTimeMillis(), TimeUnit.MILLISECONDS));
}
@Override
public int compareTo(Delayed o) {
if (o instanceof TimerTaskList) {
return Long.compare(expiration.get(), ((TimerTaskList) o).expiration.get());
}
return 0;
}
}
存储任务的容器entry
/**
* @author apdoer
* @version 1.0
* @date 2021/3/22 19:26
*/
@Data
class TimerTaskEntry implements Comparable<TimerTaskEntry> {
private TimerTask timerTask;
private long expireMs;
volatile TimerTaskList timedTaskList;
TimerTaskEntry next;
TimerTaskEntry prev;
public TimerTaskEntry(TimerTask timedTask, long expireMs) {
this.timerTask = timedTask;
this.expireMs = expireMs;
this.next = null;
this.prev = null;
}
void remove() {
TimerTaskList currentList = timedTaskList;
while (currentList != null) {
currentList.remove(this);
currentList = timedTaskList;
}
}
@Override
public int compareTo(TimerTaskEntry o) {
return ((int) (this.expireMs - o.expireMs));
}
}
任务包装类(这里也可以将工作任务以线程变量的方式去传入)
@Data
@Slf4j
class TimerTask implements Runnable {
/**
* 延时时间
*/
private long delayMs;
/**
* 任务所在的entry
*/
private TimerTaskEntry timerTaskEntry;
private String desc;
public TimerTask(String desc, long delayMs) {
this.desc = desc;
this.delayMs = delayMs;
this.timerTaskEntry = null;
}
public synchronized void setTimerTaskEntry(TimerTaskEntry entry) {
// 如果这个timetask已经被一个已存在的TimerTaskEntry持有,先移除一个
if (timerTaskEntry != null && timerTaskEntry != entry) {
timerTaskEntry.remove();
}
timerTaskEntry = entry;
}
public TimerTaskEntry getTimerTaskEntry() {
return timerTaskEntry;
}
@Override
public void run() {
log.info("============={}任务执行", desc);
}
}
来源:https://blog.csdn.net/m0_43452671/article/details/115449100
0
投稿
猜你喜欢
- 1、#{}是预编译处理,MyBatis在处理#{ }时,它会将sql中的#{ }替换为?,然后调用PreparedStatement的set
- 前几天工作中一段业务代码需要一个变量每天从1开始递增。为此自己简单的封装了一个线程安全的计数器,可以让一个变量每天从1开始递增。当然了,如果
- 如何配置 * step1: 自定义 * /** * 自定义 * */public class MyInterceptor implemen
- 关于[Cannot determine value type from string ‘xxx’]的
- 本文主要记录JAVA中对象的初始化过程,包括实例变量的初始化和类变量的初始化以及final关键字对初始化的影响。另外,还讨论了由于继承原因,
- 本章先讲解Java随机数的几种产生方式,然后通过示例对其进行演示。广义上讲,Java中的随机数的有三种产生方式:(01). 通过System
- 内网用户或 * 的用户使用 using System.IO; using System.Net; public string get_ht
- 要完成一个轮播图片,首先想到的应该是使用ViewPager来实现。ViewPager已经有了滑动的功能,我们只要让它自己滚动。再加上下方的小
- 双色球选号规则红球是1~33选6个,蓝球1~16选1个。它有17721088种排列组合,这个代码实现了如何将一组双色球号码 转换成第n个排列
- static关键字在Java中,static是静态修饰关键字。用于修饰类的成员方法、类的成员变量,另外可以编写static代码块来优化程序性
- 本文以实例阐述了C++中形参与实参的区别,有助于读者加深对于C++形参与实参的认识。形参出现在函数定义中,在整个函数体内都可以使用, 离开该
- 对象创建的几种方法:使用new关键字使用clone方法反射机制反序列化以上四种都可以产生java对象1,3都会明确的显式的调用构造函数2是在
- 在使用多线程过程中,可能会遇到在一些情况下必须等待子线程全部执行结束后主线程才进行下一步,做法如下: //在使用多线程过程中,可能会遇到在一
- 假如使用绝对路径,没有任何问题,就是移植性不太好。假如使用相对路径,则要注意当前路径“.”是在哪儿?一般我们都会在配置文件中加入log文件的
- 本文实例实现C#以一个收银付费的小程序演示switch case语法如何使用,读入用户选择,把用户的选择赋值给变量n,再根据用户的输入提示付
- 默认情况下Spring Boot使用了内嵌的Tomcat服务器,项目最终被打成jar包运行,每个jar包可以被看作一个独立的Web服务器。传
- 在spring boot里,很吸引人的一个特性是可以直接把应用打包成为一个jar/war,然后这个jar/war是可以直接启动的,而不需要另
- kafka 架构原理大数据时代来临,如果你还不知道Kafka那就真的out了!据统计,有三分之一的世界财富500强企业正在使用K
- 一、前言又见面了哈,今天为大家介绍时钟、钟表的实现方法教程。实现的方法有很多,这里只是提供了一个思路,本着抛砖引玉的心态,希望能和大家共同学
- 一、图示spring再简化:SpringBoot-jar:内嵌tomacat;微服务架构!二、springboot是什么spring是一个为