C#基于时间轮调度实现延迟任务详解
作者:a1010 发布时间:2023-07-06 08:00:49
在很多.net开发体系中开发者在面对调度作业需求的时候一般会选择三方开源成熟的作业调度框架来满足业务需求,比如Hangfire、Quartz.NET这样的框架。但是有些时候可能我们只是需要一个简易的延迟任务,这个时候引入这些框架就费力不讨好了。
最简单的粗暴的办法当然是:
Task.Run(async () =>
{
//延迟xx毫秒
await Task.Delay(time);
//业务执行
});
当时作为一个开发者,有时候还是希望使用更优雅的、可复用的一体化方案,比如可以实现一个简易的时间轮来完成基于内存的非核心重要业务的延迟调度。什么是时间轮呢,其实就是一个环形数组,每一个数组有一个插槽代表对应时刻的任务,数组的值是一个任务队列,假设我们有一个基于60秒的延迟时间轮,也就是说我们的任务会在不超过60秒(超过的情况增加分钟插槽,下面会讲)的情况下执行,那么如何实现?下面我们将定义一段代码来实现这个简单的需求
话不多说,撸代码,首先我们需要定义一个时间轮的Model类用于承载我们的延迟任务和任务处理器。简单定义如下:
public class WheelTask<T>
{
public T Data { get; set; }
public Func<T, Task> Handle { get; set; }
}
定义很简单,就是一个入参T代表要执行的任务所需要的入参,然后就是任务的具体处理器Handle。接着我们来定义时间轮本轮的核心代码:
可以看到时间轮其实核心就两个东西,一个是毫秒计时器,一个是数组插槽,这里数组插槽我们使用了字典来实现,key值分别对应0到59秒。每一个插槽的value对应一个任务队列。当添加一个新任务的时候,输入需要延迟的秒数,就会将任务插入到延迟多少秒对应的插槽内,当计时器启动的时候,每一跳刚好1秒,那么就会对插槽计数+1,然后去寻找当前插槽是否有任务,有的话就会调用ExecuteTask执行该插槽下的所有任务。
public class TimeWheel<T>
{
int secondSlot = 0;
DateTime wheelTime { get { return new DateTime(1, 1, 1, 0, 0, secondSlot); } }
Dictionary<int, ConcurrentQueue<WheelTask<T>>> secondTaskQueue;
public void Start()
{
new Timer(Callback, null, 0, 1000);
secondTaskQueue = new Dictionary<int, ConcurrentQueue<WheelTask<T>>>();
Enumerable.Range(0, 60).ToList().ForEach(x =>
{
secondTaskQueue.Add(x, new ConcurrentQueue<WheelTask<T>>());
});
}
public async Task AddTaskAsync(int second, T data, Func<T, Task> handler)
{
var handTime = wheelTime.AddSeconds(second);
if (handTime.Second != wheelTime.Second)
secondTaskQueue[handTime.Second].Enqueue(new WheelTask<T>(data, handler));
else
await handler(data);
}
async void Callback(object o)
{
if (secondSlot != 59)
secondSlot++;
else
{
secondSlot = 0;
}
if (secondTaskQueue[secondSlot].Any())
await ExecuteTask();
}
async Task ExecuteTask()
{
if (secondTaskQueue[secondSlot].Any())
while (secondTaskQueue[secondSlot].Any())
if (secondTaskQueue[secondSlot].TryDequeue(out WheelTask<T> task))
await task.Handle(task.Data);
}
}
接下来就是如果我需要大于60秒的情况如何处理呢。其实就是增加分钟插槽数组,举个例子我有一个任务需要2分40秒后执行,那么当我 插入到时间轮的时候我先插入到分钟插槽,当计时器每过去60秒,分钟插槽值+1,当分钟插槽对应有任务的时候就将这些任务从分钟插槽里弹出再入队到秒插槽中,这样一个任务会先进入插槽值=2(假设从0开始计算)的分钟插槽,计时器运行120秒后分钟值从0累加到2,2插槽的任务弹出到插槽值=40的秒插槽里,当计时器再运行40秒,刚好就可以执行这个延迟2分40秒的任务。话不多说,上代码:
首先我们将任务WheelTask增加一个Second属性,用于当任务从分钟插槽弹出来时需要知道自己入队哪个秒插槽
public class WheelTask<T>
{
...
public int Second { get; set; }
...
}
接着我们再重新定义时间轮的逻辑增加分钟插槽值以及插槽队列的部分
public class TimeWheel<T>
{
int minuteSlot, secondSlot = 0;
DateTime wheelTime { get { return new DateTime(1, 1, 1, 0, minuteSlot, secondSlot); } }
Dictionary<int, ConcurrentQueue<WheelTask<T>>> minuteTaskQueue, secondTaskQueue;
public void Start()
{
new Timer(Callback, null, 0, 1000);、
minuteTaskQueue = new Dictionary<int, ConcurrentQueue<WheelTask<T>>>();
secondTaskQueue = new Dictionary<int, ConcurrentQueue<WheelTask<T>>>();
Enumerable.Range(0, 60).ToList().ForEach(x =>
{
minuteTaskQueue.Add(x, new ConcurrentQueue<WheelTask<T>>());
secondTaskQueue.Add(x, new ConcurrentQueue<WheelTask<T>>());
});
}
...
}
同样的在添加任务的AddTaskAsync函数中我们需要增加分钟,代码改为这样,当大于1分钟的任务会入队到分钟插槽中,小于1分钟的会按原逻辑直接入队到秒插槽中:
public async Task AddTaskAsync(int minute, int second, T data, Func<T, Task> handler)
{
var handTime = wheelTime.AddMinutes(minute).AddSeconds(second);
if (handTime.Minute != wheelTime.Minute)
minuteTaskQueue[handTime.Minute].Enqueue(new WheelTask<T>(handTime.Second, data, handler));
else
{
if (handTime.Second != wheelTime.Second)
secondTaskQueue[handTime.Second].Enqueue(new WheelTask<T>(data, handler));
else
await handler(data);
}
}
最后的部分就是计时器的callback以及任务执行的部分:
async void Callback(object o)
{
bool minuteExecuteTask = false;
if (secondSlot != 59)
secondSlot++;
else
{
secondSlot = 0;
minuteExecuteTask = true;
if (minuteSlot != 59)
minuteSlot++;
else
{
minuteSlot = 0;
}
}
if (minuteExecuteTask || secondTaskQueue[secondSlot].Any())
await ExecuteTask(minuteExecuteTask);
}
async Task ExecuteTask(bool minuteExecuteTask)
{
if (minuteExecuteTask)
while (minuteTaskQueue[minuteSlot].Any())
if (minuteTaskQueue[minuteSlot].TryDequeue(out WheelTask<T> task))
secondTaskQueue[task.Second].Enqueue(task);
if (secondTaskQueue[secondSlot].Any())
while (secondTaskQueue[secondSlot].Any())
if (secondTaskQueue[secondSlot].TryDequeue(out WheelTask<T> task))
await task.Handle(task.Data);
}
基本上基于分钟+秒的时间轮延迟任务核心功能就这些了,聪明的你一定知道如何扩展增加小时,天,月份甚至年份的时间轮了。虽然从代码逻辑上可以实现,但是大部分情况下我们使用时间轮仅仅是完成一些内存易失性的非核心的任务延迟调度,实现天,周,月年意义不是很大。所以基本上到小时就差不多了。再多就上作业系统来调度吧。
来源:https://www.cnblogs.com/gmmy/p/17015538.html
猜你喜欢
- SpringMVC * 介绍springMVC 中的 * 用于拦截控制器方法的执行。先创建出前置需要的一些条件:<a th:href=
- 1. 背景从JDK1.5开始,Java支持个数可变的形参,类似:public class ParamDemo { public static
- 本文实例为大家分享了JavaMail实现带附件的邮件发送的具体代码,供大家参考,具体内容如下发送纯文本的邮件package com.haiw
- 在C#的List集合操作中,有时候需要查找到List集合中的最大值,此时可以使用List集合的扩展方法Max方法,Max方法有2种形式,一种
- 本文实例展示的是一个自定义的定时器组件,有别于.NET Framework里面提供的几个Timer。首先说说该组件开发背景,发现现在手头上的
- 本文实例为大家分享了Java实现图形化界面日历的具体代码,供大家参考,具体内容如下此程序主要功能实现了可以根据用户选择的年月日来定位日期,日
- 一次正常的请求最近别人需要调用我们系统的某一个功能,对方希望提供一个api让其能够更新数据。由于该同学是客户端开发,于是有了类似以下代码。@
- 模板编程是idea的强大功能,也提高了开发人员的编程效率,比如输入main函数:public static void main(String
- 本文实例讲述了基于.net实现裁剪网站上传图片的方法。由于客户端Javascript不能操作文件,所以只能先上传图片再在服务器端剪切。1、上
- 本文实例为大家分享了java实现砸金蛋抽奖的具体代码,供大家参考,具体内容如下代码如下需求:用户每一次砸金蛋,抽中一等奖的概率为2% 二等奖
- 经常要检测某些IP地址范围段的计算机是否在线。有很多的方法,比如进入到网关的交换机上去查询、使用现成的工具或者编写一个简单的DOS脚本等等,
- 本文实例讲述了Android动画之补间动画。分享给大家供大家参考,具体如下:前面讲了《Android动画之逐帧动画(Frame Animat
- 实践过程效果代码public partial class Form1 : Form{ public Form1()
- 一,块作用域首先在深入学习控制结构之前,需要先了解块(block)的概念。块:即复合语句,是指由一对大括号括起来的若干条简单的 Java 语
- C语言实现矩阵运算给定一个n×n的方阵,本题要求计算该矩阵除副对角线、最后一列和最后一行以外的所有元素之和。副对角线为从矩阵的右上角至左下角
- 概述非对称加密算法与对称加密算法的主要差别在于非对称加密算法用于加密和解密的密钥不相同,非对称加密算法密钥分为公钥和私钥,公钥加密只能用私钥
- 一.继承的类型在面向对象的编程中,有两种截然不同继承类型:实现继承和接口继承1.实现继承和接口继承*实现继承:表示一个类型派生于基类型,它拥
- java匿名内部类:1:匿名内部类,匿名内部类也就是没有名字的内部类。2:匿名内部类的作用正因为没有名字,所以匿名内部类只能使用一次,它通常
- 问题场景之前写过一篇文章: 2.@JvmOverloads快捷实现函数重载, 借助于Kotlin的默认参数+@JvmOverloads简化自
- 摘要分析验证码素材图片混淆原理,并采用selenium模拟人拖动滑块过程,进而破解验证码。人工验证的过程1、打开威锋网注册页面2、移动鼠标至