Netty分布式NioEventLoop任务队列执行源码分析
作者:向南是个万人迷 发布时间:2022-10-08 04:07:19
前文传送门:NioEventLoop处理IO事件
执行任务队列
继续回到NioEventLoop的run()方法:
protected void run() {
for (;;) {
try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
//轮询io事件(1)
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
default:
}
cancelledKeys = 0;
needsToSelectAgain = false;
//默认是50
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
processSelectedKeys();
} finally {
runAllTasks();
}
} else {
//记录下开始时间
final long ioStartTime = System.nanoTime();
try {
//处理轮询到的key(2)
processSelectedKeys();
} finally {
//计算耗时
final long ioTime = System.nanoTime() - ioStartTime;
//执行task(3)
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
//代码省略
}
}
我们看到处理完轮询到的key之后, 首先记录下耗时, 然后通过runAllTasks(ioTime * (100 - ioRatio) / ioRatio)执行taskQueue中的任务
我们知道ioRatio默认是50, 所以执行完ioTime * (100 - ioRatio) / ioRatio后, 方法传入的值为ioTime, 也就是processSelectedKeys()的执行时间:
跟进runAllTasks方法:
protected boolean runAllTasks(long timeoutNanos) {
//定时任务队列中聚合任务
fetchFromScheduledTaskQueue();
//从普通taskQ里面拿一个任务
Runnable task = pollTask();
//task为空, 则直接返回
if (task == null) {
//跑完所有的任务执行收尾的操作
afterRunningAllTasks();
return false;
}
//如果队列不为空
//首先算一个截止时间(+50毫秒, 因为执行任务, 不要超过这个时间)
final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
long runTasks = 0;
long lastExecutionTime;
//执行每一个任务
for (;;) {
safeExecute(task);
//标记当前跑完的任务
runTasks ++;
//当跑完64个任务的时候, 会计算一下当前时间
if ((runTasks & 0x3F) == 0) {
//定时任务初始化到当前的时间
lastExecutionTime = ScheduledFutureTask.nanoTime();
//如果超过截止时间则不执行(nanoTime()是耗时的)
if (lastExecutionTime >= deadline) {
break;
}
}
//如果没有超过这个时间, 则继续从普通任务队列拿任务
task = pollTask();
//直到没有任务执行
if (task == null) {
//记录下最后执行时间
lastExecutionTime = ScheduledFutureTask.nanoTime();
break;
}
}
//收尾工作
afterRunningAllTasks();
this.lastExecutionTime = lastExecutionTime;
return true;
}
首先会执行fetchFromScheduledTaskQueue()这个方法, 这个方法的意思是从定时任务队列中聚合任务, 也就是将定时任务中找到可以执行的任务添加到taskQueue中
我们跟进fetchFromScheduledTaskQueue()方法
private boolean fetchFromScheduledTaskQueue() {
long nanoTime = AbstractScheduledEventExecutor.nanoTime();
//从定时任务队列中抓取第一个定时任务
//寻找截止时间为nanoTime的任务
Runnable scheduledTask = pollScheduledTask(nanoTime);
//如果该定时任务队列不为空, 则塞到普通任务队列里面
while (scheduledTask != null) {
//如果添加到普通任务队列过程中失败
if (!taskQueue.offer(scheduledTask)) {
//则重新添加到定时任务队列中
scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);
return false;
}
//继续从定时任务队列中拉取任务
//方法执行完成之后, 所有符合运行条件的定时任务队列, 都添加到了普通任务队列中
scheduledTask = pollScheduledTask(nanoTime);
}
return true;
}
long nanoTime = AbstractScheduledEventExecutor.nanoTime()
代表从定时任务初始化到现在过去了多长时间
Runnable scheduledTask= pollScheduledTask(nanoTime)
代表从定时任务队列中拿到小于nanoTime时间的任务, 因为小于初始化到现在的时间, 说明该任务需要执行了
跟到其父类AbstractScheduledEventExecutor的pollScheduledTask(nanoTime)方法中:
protected final Runnable pollScheduledTask(long nanoTime) {
assert inEventLoop();
//拿到定时任务队列
Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
//peek()方法拿到第一个任务
ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();
if (scheduledTask == null) {
return null;
}
if (scheduledTask.deadlineNanos() <= nanoTime) {
//从队列中删除
scheduledTaskQueue.remove();
//返回该任务
return scheduledTask;
}
return null;
}
我们看到首先获得当前类绑定的定时任务队列的成员变量
如果不为空, 则通过scheduledTaskQueue.peek()弹出第一个任务
如果当前任务小于传来的时间, 说明该任务需要执行, 则从定时任务队列中删除
我们继续回到fetchFromScheduledTaskQueue()方法中:
private boolean fetchFromScheduledTaskQueue() {
long nanoTime = AbstractScheduledEventExecutor.nanoTime();
//从定时任务队列中抓取第一个定时任务
//寻找截止时间为nanoTime的任务
Runnable scheduledTask = pollScheduledTask(nanoTime);
//如果该定时任务队列不为空, 则塞到普通任务队列里面
while (scheduledTask != null) {
//如果添加到普通任务队列过程中失败
if (!taskQueue.offer(scheduledTask)) {
//则重新添加到定时任务队列中
scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);
return false;
}
//继续从定时任务队列中拉取任务
//方法执行完成之后, 所有符合运行条件的定时任务队列, 都添加到了普通任务队列中
scheduledTask = pollScheduledTask(nanoTime);
}
return true;
}
弹出需要执行的定时任务之后, 我们通过taskQueue.offer(scheduledTask)添加到taskQueue中, 如果添加失败, 则通过
scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask)
重新添加到定时任务队列中
如果添加成功, 则通过pollScheduledTask(nanoTime)方法继续添加, 直到没有需要执行的任务
这样就将定时任务队列需要执行的任务添加到了taskQueue中
回到runAllTasks(long timeoutNanos)方法中
protected boolean runAllTasks(long timeoutNanos) {
//定时任务队列中聚合任务
fetchFromScheduledTaskQueue();
//从普通taskQ里面拿一个任务
Runnable task = pollTask();
//task为空, 则直接返回
if (task == null) {
//跑完所有的任务执行收尾的操作
afterRunningAllTasks();
return false;
}
//如果队列不为空
//首先算一个截止时间(+50毫秒, 因为执行任务, 不要超过这个时间)
final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
long runTasks = 0;
long lastExecutionTime;
//执行每一个任务
for (;;) {
safeExecute(task);
//标记当前跑完的任务
runTasks ++;
//当跑完64个任务的时候, 会计算一下当前时间
if ((runTasks & 0x3F) == 0) {
//定时任务初始化到当前的时间
lastExecutionTime = ScheduledFutureTask.nanoTime();
//如果超过截止时间则不执行(nanoTime()是耗时的)
if (lastExecutionTime >= deadline) {
break;
}
}
//如果没有超过这个时间, 则继续从普通任务队列拿任务
task = pollTask();
//直到没有任务执行
if (task == null) {
//记录下最后执行时间
lastExecutionTime = ScheduledFutureTask.nanoTime();
break;
}
}
//收尾工作
afterRunningAllTasks();
this.lastExecutionTime = lastExecutionTime;
return true;
}
首先通过 Runnable task = pollTask() 从taskQueue中拿一个任务
任务不为空, 则通过
final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos
计算一个截止时间, 任务的执行时间不能超过这个时间
然后在for循环中通过safeExecute(task)执行task
我们跟到safeExecute(task)中:
protected static void safeExecute(Runnable task) {
try {
//直接调用run()方法执行
task.run();
} catch (Throwable t) {
//发生异常不终止
logger.warn("A task raised an exception. Task: {}", task, t);
}
}
这里直接调用task的run()方法进行执行, 其中发生异常, 只打印一条日志, 代表发生异常不终止, 继续往下执行
回到runAllTasks(long timeoutNanos)方法
protected boolean runAllTasks(long timeoutNanos) {
//定时任务队列中聚合任务
fetchFromScheduledTaskQueue();
//从普通taskQ里面拿一个任务
Runnable task = pollTask();
//task为空, 则直接返回
if (task == null) {
//跑完所有的任务执行收尾的操作
afterRunningAllTasks();
return false;
}
//如果队列不为空
//首先算一个截止时间(+50毫秒, 因为执行任务, 不要超过这个时间)
final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
long runTasks = 0;
long lastExecutionTime;
//执行每一个任务
for (;;) {
safeExecute(task);
//标记当前跑完的任务
runTasks ++;
//当跑完64个任务的时候, 会计算一下当前时间
if ((runTasks & 0x3F) == 0) {
//定时任务初始化到当前的时间
lastExecutionTime = ScheduledFutureTask.nanoTime();
//如果超过截止时间则不执行(nanoTime()是耗时的)
if (lastExecutionTime >= deadline) {
break;
}
}
//如果没有超过这个时间, 则继续从普通任务队列拿任务
task = pollTask();
//直到没有任务执行
if (task == null) {
//记录下最后执行时间
lastExecutionTime = ScheduledFutureTask.nanoTime();
break;
}
}
//收尾工作
afterRunningAllTasks();
this.lastExecutionTime = lastExecutionTime;
return true;
}
每次执行完task, runTasks自增
这里 if ((runTasks & 0x3F) == 0) 代表是否执行了64个任务, 如果执行了64个任务, 则会通过 lastExecutionTime = ScheduledFutureTask.nanoTime() 记录定时任务初始化到现在的时间, 如果这个时间超过了截止时间, 则退出循环
如果没有超过截止时间, 则通过 task = pollTask() 继续弹出任务执行
这里执行64个任务统计一次时间, 而不是每次执行任务都统计, 主要原因是因为获取系统时间是个比较耗时的操作, 这里是netty的一种优化方式
如果没有task需要执行, 则通过afterRunningAllTasks()做收尾工作, 最后记录下最后的执行时间
章节小结
本章学习了有关NioEventLoopGroup的创建, NioEventLoop的创建和启动, 以及多路复用器的轮询处理和task执行的相关逻辑, 通过本章学习, 我们应该掌握如下内容:
1. NioEventLoopGroup如何选择分配NioEventLoop
2. NioEventLoop如何开启
3. NioEventLoop如何进行select操作
4. NioEventLoop如何执行task
来源:https://www.cnblogs.com/xiangnan6122/p/10203169.html


猜你喜欢
- 在类中自定义的“函数”称为“方法”,由于C#是完全面向对象的
- 运行原理1、不同线程中所包含的栈帧是不允许存在相互引用的。2、如果当前方法调用了其他方法,方法返回之际,当前栈帧会传回此方法的执行结果给当前
- C#实现委托namespace Delegate{ delegate void DGSayiHi(string n
- finalize方法是什么finalize方法是Object的protected方法,Object的子类们可以覆盖该方法以实现资源清理工作,
- 背景后台系统需要接入 企业微信登入,满足企业员工快速登入系统流程图简单代码说明自定义一套 springsecurity 认证逻辑主要就是 根
- 操作字符串的类都有哪些?区别是什么?操作字符串的类主要用三个,分别是String类,StringBuffer类和StringBuilder类
- 一、引入类型别名当配置 XML 文件,需要指明Java类型时,类型别名可替代Java类型的全名,一般会设置一个简单缩写的类型别名去替代它,用
- JPA连接到数据库,调用存储过程,这样的需求很常见。本文就针对这一点,讲述如何使用spring Data JPA调用存储过程的方法。1、存储
- 前言由于不小心将and或者or写在了语句后面,导致mybatis无法自主判别,这种问题在新上手的同学中很是常见。下面我们探讨一下,在哪些情况
- 概述工作电脑用了3年多了,100G的C盘也快吃不消了,每次打开看到C盘红了,总要用清理工具清理一下子.不知道怎么最近清理工具清理
- 在面向对象编程中,SOLID 是五个设计原则的首字母缩写,旨在使软件设计更易于理解、灵活和可维护。这些原则是由美国软件工程师和讲师
- 今天讲解一下Fragment的控制,主要是切换View和页面替换等操作。还有就是如何获取Fragment的管理对象,以及与Activity的
- 高分配速率(High Allocation Rate)分配速率(Allocation rate)表示单位时间内分配的内存量。通常使用&nbs
- 1.选择一个WebService接口作测试假设 WebService url 为 http://ws.webxml.com.cn/WebSe
- 小伙伴们在使用ICP提供的各种能力进行集成开发时常常会遇到一些技术上的困扰,例如ICP中很多接口是通过OCX控件的方式提供的,如何调用这些接
- 目录前言1、创建一个控制台应用程序2、编写测试代码并分析3、总结前言对于C#里面的Foreach学过 语言的人都知道怎么用,但是其原理相信很
- /**Bitmap放大的方法*/ private static Bitmap big(Bitmap bitmap) { Matrix mat
- 详解Java虚拟机管理的内存运行时数据区域概述 Java虚拟机在执行Java程序的过程中会把它所管理的内
- delegate double ProcessDelegate(double param1, double param2); &n
- 本文介绍了SpringCloud +Zookeeper完成配置中心,分享给大家,具有如下:使用场景项目配置更改不需要打包,重启提供配置文件的