C#基于时间轮调度实现延迟任务详解
作者:a1010 发布时间:2023-07-06 08:00:49
在很多.net开发体系中开发者在面对调度作业需求的时候一般会选择三方开源成熟的作业调度框架来满足业务需求,比如Hangfire、Quartz.NET这样的框架。但是有些时候可能我们只是需要一个简易的延迟任务,这个时候引入这些框架就费力不讨好了。
最简单的粗暴的办法当然是:
Task.Run(async () =>
{
//延迟xx毫秒
await Task.Delay(time);
//业务执行
});
当时作为一个开发者,有时候还是希望使用更优雅的、可复用的一体化方案,比如可以实现一个简易的时间轮来完成基于内存的非核心重要业务的延迟调度。什么是时间轮呢,其实就是一个环形数组,每一个数组有一个插槽代表对应时刻的任务,数组的值是一个任务队列,假设我们有一个基于60秒的延迟时间轮,也就是说我们的任务会在不超过60秒(超过的情况增加分钟插槽,下面会讲)的情况下执行,那么如何实现?下面我们将定义一段代码来实现这个简单的需求
话不多说,撸代码,首先我们需要定义一个时间轮的Model类用于承载我们的延迟任务和任务处理器。简单定义如下:
public class WheelTask<T>
{
public T Data { get; set; }
public Func<T, Task> Handle { get; set; }
}
定义很简单,就是一个入参T代表要执行的任务所需要的入参,然后就是任务的具体处理器Handle。接着我们来定义时间轮本轮的核心代码:
可以看到时间轮其实核心就两个东西,一个是毫秒计时器,一个是数组插槽,这里数组插槽我们使用了字典来实现,key值分别对应0到59秒。每一个插槽的value对应一个任务队列。当添加一个新任务的时候,输入需要延迟的秒数,就会将任务插入到延迟多少秒对应的插槽内,当计时器启动的时候,每一跳刚好1秒,那么就会对插槽计数+1,然后去寻找当前插槽是否有任务,有的话就会调用ExecuteTask执行该插槽下的所有任务。
public class TimeWheel<T>
{
int secondSlot = 0;
DateTime wheelTime { get { return new DateTime(1, 1, 1, 0, 0, secondSlot); } }
Dictionary<int, ConcurrentQueue<WheelTask<T>>> secondTaskQueue;
public void Start()
{
new Timer(Callback, null, 0, 1000);
secondTaskQueue = new Dictionary<int, ConcurrentQueue<WheelTask<T>>>();
Enumerable.Range(0, 60).ToList().ForEach(x =>
{
secondTaskQueue.Add(x, new ConcurrentQueue<WheelTask<T>>());
});
}
public async Task AddTaskAsync(int second, T data, Func<T, Task> handler)
{
var handTime = wheelTime.AddSeconds(second);
if (handTime.Second != wheelTime.Second)
secondTaskQueue[handTime.Second].Enqueue(new WheelTask<T>(data, handler));
else
await handler(data);
}
async void Callback(object o)
{
if (secondSlot != 59)
secondSlot++;
else
{
secondSlot = 0;
}
if (secondTaskQueue[secondSlot].Any())
await ExecuteTask();
}
async Task ExecuteTask()
{
if (secondTaskQueue[secondSlot].Any())
while (secondTaskQueue[secondSlot].Any())
if (secondTaskQueue[secondSlot].TryDequeue(out WheelTask<T> task))
await task.Handle(task.Data);
}
}
接下来就是如果我需要大于60秒的情况如何处理呢。其实就是增加分钟插槽数组,举个例子我有一个任务需要2分40秒后执行,那么当我 插入到时间轮的时候我先插入到分钟插槽,当计时器每过去60秒,分钟插槽值+1,当分钟插槽对应有任务的时候就将这些任务从分钟插槽里弹出再入队到秒插槽中,这样一个任务会先进入插槽值=2(假设从0开始计算)的分钟插槽,计时器运行120秒后分钟值从0累加到2,2插槽的任务弹出到插槽值=40的秒插槽里,当计时器再运行40秒,刚好就可以执行这个延迟2分40秒的任务。话不多说,上代码:
首先我们将任务WheelTask增加一个Second属性,用于当任务从分钟插槽弹出来时需要知道自己入队哪个秒插槽
public class WheelTask<T>
{
...
public int Second { get; set; }
...
}
接着我们再重新定义时间轮的逻辑增加分钟插槽值以及插槽队列的部分
public class TimeWheel<T>
{
int minuteSlot, secondSlot = 0;
DateTime wheelTime { get { return new DateTime(1, 1, 1, 0, minuteSlot, secondSlot); } }
Dictionary<int, ConcurrentQueue<WheelTask<T>>> minuteTaskQueue, secondTaskQueue;
public void Start()
{
new Timer(Callback, null, 0, 1000);、
minuteTaskQueue = new Dictionary<int, ConcurrentQueue<WheelTask<T>>>();
secondTaskQueue = new Dictionary<int, ConcurrentQueue<WheelTask<T>>>();
Enumerable.Range(0, 60).ToList().ForEach(x =>
{
minuteTaskQueue.Add(x, new ConcurrentQueue<WheelTask<T>>());
secondTaskQueue.Add(x, new ConcurrentQueue<WheelTask<T>>());
});
}
...
}
同样的在添加任务的AddTaskAsync函数中我们需要增加分钟,代码改为这样,当大于1分钟的任务会入队到分钟插槽中,小于1分钟的会按原逻辑直接入队到秒插槽中:
public async Task AddTaskAsync(int minute, int second, T data, Func<T, Task> handler)
{
var handTime = wheelTime.AddMinutes(minute).AddSeconds(second);
if (handTime.Minute != wheelTime.Minute)
minuteTaskQueue[handTime.Minute].Enqueue(new WheelTask<T>(handTime.Second, data, handler));
else
{
if (handTime.Second != wheelTime.Second)
secondTaskQueue[handTime.Second].Enqueue(new WheelTask<T>(data, handler));
else
await handler(data);
}
}
最后的部分就是计时器的callback以及任务执行的部分:
async void Callback(object o)
{
bool minuteExecuteTask = false;
if (secondSlot != 59)
secondSlot++;
else
{
secondSlot = 0;
minuteExecuteTask = true;
if (minuteSlot != 59)
minuteSlot++;
else
{
minuteSlot = 0;
}
}
if (minuteExecuteTask || secondTaskQueue[secondSlot].Any())
await ExecuteTask(minuteExecuteTask);
}
async Task ExecuteTask(bool minuteExecuteTask)
{
if (minuteExecuteTask)
while (minuteTaskQueue[minuteSlot].Any())
if (minuteTaskQueue[minuteSlot].TryDequeue(out WheelTask<T> task))
secondTaskQueue[task.Second].Enqueue(task);
if (secondTaskQueue[secondSlot].Any())
while (secondTaskQueue[secondSlot].Any())
if (secondTaskQueue[secondSlot].TryDequeue(out WheelTask<T> task))
await task.Handle(task.Data);
}
基本上基于分钟+秒的时间轮延迟任务核心功能就这些了,聪明的你一定知道如何扩展增加小时,天,月份甚至年份的时间轮了。虽然从代码逻辑上可以实现,但是大部分情况下我们使用时间轮仅仅是完成一些内存易失性的非核心的任务延迟调度,实现天,周,月年意义不是很大。所以基本上到小时就差不多了。再多就上作业系统来调度吧。
来源:https://www.cnblogs.com/gmmy/p/17015538.html


猜你喜欢
- 1.1、获取http请求参数是一种刚需我想有的小伙伴肯定有过获取http请求的需要,比如想前置获取参数,统计请求数据做服务的接口签名校验敏感
- 最近几天一直在看Hadoop相关的书籍,目前稍微有点感觉,自己就仿照着WordCount程序自己编写了一个统计关联商品。需求描述:根据超市的
- 前言结果映射指的是将数据表中的字段与实体类中的属性关联起来,这样 MyBatis 就可以根据查询到的数据来填充实体对象的属性,帮助我们完成赋
- 在上一篇文章中,我们学习了Camera的基本用法,并借助它们编写了一个例子,实现了类似于API Demos里的图片中轴旋转功能。不过那个例子
- SpringBoot在annotation的层面实现了数据缓存的功能,基于Spring的AOP技术。所有的缓存配置只是在annotation
- file: BluetoothEventLoop.java GB/GB2/GB3: 1. import android.os.PowerMa
- try { using (TransactionScope tr = new Transact
- 判断一个数是不是回文数示例,回文数就是原数与其倒置后的数相等,如:123321,到之后仍为123321,即为回文数题目:一个5位数,判断它是
- android开发中为activity增加左右手势识别,如右滑关闭当前页面。/* * for左右手势 *&n
- 在C#当中,利用WebClient这个核心类,可以轻易的打造一个下载器。但是这里想要强调的是,我们用的是异步操作。所谓异步,是相对于同步的概
- 线程同步的概念由于同一个进程的多个线程共享同一块存储空间,在带来方便的同时,也会带来访问冲突的问题:举例:public class Runn
- 目录SpringBoot整合OpenApiOpenAPI依赖编写配置类改造优化OpenAPI常用注解介绍实体类controller类演示网上
- 本文提纲版本约定JDK:8Servlet:4.xtomcat:9.x✍正文什么样的答案终身难忘?学生时代关于记忆经常能听见两种论调:死记硬背
- 1,LuBan压缩问题 https://github.com/Curzibn/Luban之前选择压缩图片
- 具体代码如下所示:public class Parent { public static int a = parentStati
- 这篇文章主要介绍了Java调用明华RF读写器DLL文件过程解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,
- Mybatis采用责任链模式,通过 * 组织多个 * (插件),通过这些 * 可以改变Mybatis的默认行为(诸如SQL重写之类的),由
- 前言Java8 由Oracle在2014年发布,是继Java5之后最具革命性的版本。Java8吸收其他语言的精髓带来了函数式编程,lambd
- 一、File --> new -->project二、构建maven项目。三、创建项目名,报名,项目路径。四、选择好maven仓
- 提到类型转换,首先要明确C#中的数据类型,主要分为值类型和引用类型:1.常用的值类型有:(struct)整型家族:int,byte,char