Netty分布式ByteBuf使用SocketChannel读取数据过程剖析
作者:向南是个万人迷 发布时间:2023-04-28 03:15:42
我们第三章分析过客户端接入的流程, 这一小节带大家剖析客户端发送数据, Server读取数据的流程:
首先温馨提示, 这一小节高度耦合第三章的第1, 2节的内容, 很多知识这里并不会重复讲解, 如果对之前的知识印象不深刻建议恶补第三章的第1, 2节的内容之后再学习这一小节
传送门:
初始化NioSockectChannelConfig
处理接入事件之handle的创建
Server读取数据的流程
我们首先看NioEventLoop的processSelectedKey方法
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
//获取到channel中的unsafe
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
//如果这个key不是合法的, 说明这个channel可能有问题
if (!k.isValid()) {
//代码省略
}
try {
//如果是合法的, 拿到key的io事件
int readyOps = k.readyOps();
//链接事件
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
//写事件
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
ch.unsafe().forceFlush();
}
//读事件和接受链接事件
//如果当前NioEventLoop是work线程的话, 这里就是op_read事件
//如果是当前NioEventLoop是boss线程的话, 这里就是op_accept事件
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
if (!ch.isOpen()) {
return;
}
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0)
这里的判断表示轮询到大事件是op_read或者op_accept事件
之前的章节分析过, 如果当前NioEventLoop是work线程的话, 那么这里就是op_read事件, 也就是读事件, 表示客户端发来了数据流
这里会调用unsafe的redis()方法进行读取
如果是work线程, 那么这里的channel是NioServerSocketChannel, 其绑定的unsafe是NioByteUnsafe, 这里会走进NioByteUnsafe的read()方法中:
public final void read() {
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
allocHandle.reset(config);
ByteBuf byteBuf = null;
boolean close = false;
try {
do {
byteBuf = allocHandle.allocate(allocator);
allocHandle.lastBytesRead(doReadBytes(byteBuf));
if (allocHandle.lastBytesRead() <= 0) {
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
break;
}
allocHandle.incMessagesRead(1);
readPending = false;
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
} while (allocHandle.continueReading());
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
if (close) {
closeOnRead(pipeline);
}
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close, allocHandle);
} finally {
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
}
首先获取SocketChannel的config, pipeline等相关属性
final ByteBufAllocator allocator = config.getAllocator(); 这一步是获取一个ByteBuf的内存分配器, 用于分配ByteBuf
这里会走到DefaultChannelConfig的getAllocator方法中
public ByteBufAllocator getAllocator() {
return allocator;
}
这里返回的DefualtChannelConfig的成员变量, 我们看这个成员变量:
private volatile ByteBufAllocator allocator = ByteBufAllocator.DEFAULT;
这里调用ByteBufAllocator的属性DEFAULT, 跟进去:
ByteBufAllocator DEFAULT = ByteBufUtil.DEFAULT_ALLOCATOR;
我们看到这里又调用了ByteBufUtil的静态属性DEFAULT_ALLOCATOR, 再跟进去:
static final ByteBufAllocator DEFAULT_ALLOCATOR;
DEFAULT_ALLOCATOR这个属性是在static块中初始化的
我们跟到static块中
static {
String allocType = SystemPropertyUtil.get(
"io.netty.allocator.type", PlatformDependent.isAndroid() ? "unpooled" : "pooled");
allocType = allocType.toLowerCase(Locale.US).trim();
ByteBufAllocator alloc;
if ("unpooled".equals(allocType)) {
alloc = UnpooledByteBufAllocator.DEFAULT;
logger.debug("-Dio.netty.allocator.type: {}", allocType);
} else if ("pooled".equals(allocType)) {
alloc = PooledByteBufAllocator.DEFAULT;
logger.debug("-Dio.netty.allocator.type: {}", allocType);
} else {
alloc = PooledByteBufAllocator.DEFAULT;
logger.debug("-Dio.netty.allocator.type: pooled (unknown: {})", allocType);
}
DEFAULT_ALLOCATOR = alloc;
//代码省略
}
首先判断运行环境是不是安卓, 如果不是安卓, 在返回"pooled"字符串保存在allocType中
然后通过if判断, 最后局部变量alloc = PooledByteBufAllocator.DEFAULT, 最后将alloc赋值到成员变量DEFAULT_ALLOCATOR
我们跟到PooledByteBufAllocator的DEFAULT属性中:
public static final PooledByteBufAllocator DEFAULT =
new PooledByteBufAllocator(PlatformDependent.directBufferPreferred());
我们看到这里直接通过new的方式, 创建了一个PooledByteBufAllocator对象, 也就是基于申请一块连续内存进行缓冲区分配的缓冲区分配器
缓冲区分配器的知识, 我们之前小节进行了详细的剖析, 这里就不再赘述
回到NioByteUnsafe的read()方法中
public final void read() {
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
allocHandle.reset(config);
ByteBuf byteBuf = null;
boolean close = false;
try {
do {
byteBuf = allocHandle.allocate(allocator);
allocHandle.lastBytesRead(doReadBytes(byteBuf));
if (allocHandle.lastBytesRead() <= 0) {
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
break;
}
allocHandle.incMessagesRead(1);
readPending = false;
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
} while (allocHandle.continueReading());
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
if (close) {
closeOnRead(pipeline);
}
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close, allocHandle);
} finally {
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
}
这里 ByteBufAllocator allocator = config.getAllocator()中的allocator , 就是PooledByteBufAllocator
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle() 是创建一个handle, 我们之前的章节讲过, handle是对RecvByteBufAllocator进行实际操作的对象
我们跟进recvBufAllocHandle
public RecvByteBufAllocator.Handle recvBufAllocHandle() {
//如果不存在, 则创建一个handle的实例
if (recvHandle == null) {
recvHandle = config().getRecvByteBufAllocator().newHandle();
}
return recvHandle;
}
这里是我们之前剖析过的逻辑, 如果不存在, 则创建handle的实例, 具体创建过程我们可以回顾第三章的第二小节, 这里就不再赘述
同样allocHandle.reset(config)是将配置重置, 第三章的第二小节也对其进行过剖析
重置完配置之后, 进行do-while循环, 有关循环终止条件allocHandle.continueReading(), 之前小节也有过详细剖析, 这里也不再赘述
在do-while循环中, 首先看 byteBuf = allocHandle.allocate(allocator) 这一步, 这里传入了刚才创建的allocate对象, 也就是PooledByteBufAllocator:
这里会跑到DefaultMaxMessagesRecvByteBufAllocator类的allocate方法中:
public ByteBuf allocate(ByteBufAllocator alloc) {
return alloc.ioBuffer(guess());
}
这里的guess方法, 会调用AdaptiveRecvByteBufAllocator的guess方法:
public int guess() {
return nextReceiveBufferSize;
}
这里会返回AdaptiveRecvByteBufAllocator的成员变量nextReceiveBufferSize, 也就是下次所分配缓冲区的大小, 根据我们之前学习的内容, 第一次分配的时候会分配初始大小, 也就是1024字节
回到DefaultMaxMessagesRecvByteBufAllocator类的allocate方法中:
这样, alloc.ioBuffer(guess())就会分配一个PooledByteBuf
我们跟到AbstractByteBufAllocator的ioBuffer方法中:
public ByteBuf ioBuffer(int initialCapacity) {
if (PlatformDependent.hasUnsafe()) {
return directBuffer(initialCapacity);
}
return heapBuffer(initialCapacity);
}
这里首先判断是否能获取jdk的unsafe对象, 默认为true, 所以会走到directBuffer(initialCapacity)中, 这里最终会分配一个PooledUnsafeDirectByteBuf对象, 具体分配流程我们再之前小节做过详细剖析
回到NioByteUnsafe的read()方法中:
分配完了ByteBuf之后, 再看这一步allocHandle.lastBytesRead(doReadBytes(byteBuf)):
首先看参数doReadBytes(byteBuf)方法, 这步是将channel中的数据读取到我们刚分配的ByteBuf中, 并返回读取到的字节数
这里会调用到NioSocketChannel的doReadBytes方法:
protected int doReadBytes(ByteBuf byteBuf) throws Exception {
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.attemptedBytesRead(byteBuf.writableBytes());
return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());
}
首先拿到绑定在channel中的handler, 因为我们已经创建了handle, 所以这里会直接拿到
再看allocHandle.attemptedBytesRead(byteBuf.writableBytes())这步, byteBuf.writableBytes()返回byteBuf的可写字节数, 也就是最多能从channel中读取多少字节写到ByteBuf, allocate的attemptedBytesRead会把可写字节数设置到DefaultMaxMessagesRecvByteBufAllocator 类的attemptedBytesRead属性中
跟到DefaultMaxMessagesRecvByteBufAllocator中的attemptedBytesRead我们会看到:
public void attemptedBytesRead(int bytes) {
attemptedBytesRead = bytes;
}
继续看doReadBytes方法
最后, 通过byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead())将jdk底层的channel中的数据写入到我们创建的ByteBuf中, 并返回实际写入的字节数
回到NioByteUnsafe的read()方法中:
继续看allocHandle.lastBytesRead(doReadBytes(byteBuf))这步
刚才我们剖析过doReadBytes(byteBuf)返回的是世界写入ByteBuf的字节数
再看lastBytesRead方法, 跟到DefaultMaxMessagesRecvByteBufAllocator的lastBytesRead方法中:
public final void lastBytesRead(int bytes) {
lastBytesRead = bytes;
totalBytesRead += bytes;
if (totalBytesRead < 0) {
totalBytesRead = Integer.MAX_VALUE;
}
}
这里会赋值两个属性, lastBytesRead代表最后读取的字节数, 这里赋值为我们刚才写入ByteBuf的字节数, totalBytesRead表示总共读取的字节数, 这里将写入的字节数追加
继续看NioByteUnsafe的read()方法:
如果最后一次读取数据为0, 说明已经将channel中的数据全部读取完毕, 将新创建的ByteBuf释放循环利用, 并跳出循环
allocHandle.incMessagesRead(1)这步是增加消息的读取次数, 因为我们循环最多16次, 所以当增加消息次数增加到16会结束循环
读取完毕之后, 会通过pipeline.fireChannelRead(byteBuf)将传递channelRead事件, 有关channelRead事件, 我们在第四章也进行了详细的剖析
这里读者会有疑问, 如果一次读取不完, 就传递channelRead事件, 那么server接收到的数据有可能就是不完整的, 其实关于这点, netty也做了相应的处理, 我们会在之后的章节详细剖析netty的半包处理机制
循环结束后, 会执行到allocHandle.readComplete()这一步
我们知道第一次分配ByteBuf的初始容量是1024, 但是初始容量不一定一定满足所有的业务场景, netty中, 将每次读取数据的字节数进行记录, 然后之后次分配ByteBuf的时候, 容量会尽可能的符合业务场景所需要大小, 具体实现方式, 就是在readComplete()这一步体现的
我们跟到AdaptiveRecvByteBufAllocator的readComplete()方法中:
public void readComplete() {
record(totalBytesRead());
}
这里调用了record方法, 并且传入了这一次所读取的字节总数
跟到record方法中
private void record(int actualReadBytes) {
if (actualReadBytes <= SIZE_TABLE[Math.max(0, index - INDEX_DECREMENT - 1)]) {
if (decreaseNow) {
index = Math.max(index - INDEX_DECREMENT, minIndex);
nextReceiveBufferSize = SIZE_TABLE[index];
decreaseNow = false;
} else {
decreaseNow = true;
}
} else if (actualReadBytes >= nextReceiveBufferSize) {
index = Math.min(index + INDEX_INCREMENT, maxIndex);
nextReceiveBufferSize = SIZE_TABLE[index];
decreaseNow = false;
}
}
首先看判断条件 if (actualReadBytes <= SIZE_TABLE[Math.max(0, index - INDEX_DECREMENT - 1)])
这里index是当前分配的缓冲区大小所在的SIZE_TABLE中的索引, 将这个索引进行缩进, 然后根据缩进后的所以找出SIZE_TABLE中所存储的内存值, 再判断是否大于等于这次读取的最大字节数, 如果条件成立, 说明分配的内存过大, 需要缩容操作, 我们看if块中缩容相关的逻辑
首先 if (decreaseNow) 会判断是否立刻进行收缩操作, 通常第一次不会进行收缩操作, 然后会将decreaseNow设置为true, 代表下一次直接进行收缩操作
假设需要立刻进行收缩操作, 我们看收缩操作的相关逻辑:
index = Math.max(index - INDEX_DECREMENT, minIndex) 这一步将索引缩进一步, 但不能小于最小索引值
然后通过 nextReceiveBufferSize = SIZE_TABLE[index] 获取设置索引之后的内存, 赋值在nextReceiveBufferSize, 也就是下次需要分配的大小, 下次就会根据这个大小分配ByteBuf了, 这样就实现了缩容操作
再看 else if (actualReadBytes >= nextReceiveBufferSize)
这里判断这次读取字节的总量比上次分配的大小还要大, 则进行扩容操作
扩容操作也很简单, 索引步进, 然后拿到步进后的索引所对应的内存值, 作为下次所需要分配的大小
再NioByteUnsafe的read()方法中:
经过了缩容或者扩容操作之后, 通过pipeline.fireChannelReadComplete()传播ChannelReadComplete()事件
章节总结
本章主要剖析了ByteBuf的基本操作以及缓冲区分配等相关知识.
缓冲区分配, 分为通过调用jdk的api的方式和分配一块连续内存的方式
其中, 通过分配连续内存的方式分配缓冲区中, 又介绍了在page级别分配的逻辑和在subpage级别分配的逻辑
page级别分配时通过操作内存二叉树的方式记录分配情况
subpage级别分配是通过位图的方式记录分配情况
最后介绍了NioSocketChannel处理读事件的相关逻辑
总体来说, 这一章的内容难度是比较大的, 希望同学课后通过多调试的方式进行熟练掌握
来源:https://www.cnblogs.com/xiangnan6122/p/10205889.html


猜你喜欢
- 项目中经常遇到分数统计的需求,例如我们执行了某项操作或做了某个题目,操作正确则计分,相反则不计分失去该项分数,为了应对需求需要一个分数统计系
- 环境配置:Jdk1.8 + Tomcat8.5 + mysql + Eclispe(IntelliJ IDEA,Eclispe,MyEcli
- 概述在实际项目开发中如果需要支持多语言,我们需要整理项目中所有的字符串并翻译成对应的语种放在相应的文件夹下,就像这样最让我们头痛的是我们得一
- association和collection用法1.单个关联查询association1.1实体之间的关联表示package com.wor
- Spring Security中也提供了默认的注销配置,在开发时也可以按照自己需求对注销进行个性化定制开启注销 默认开启package co
- paras.xml文件<?xml version="1.0" encoding="UTF-8"
- 前言在很多时候,我们代码中会有很多分支,而且分支下面的代码又有一些复杂的逻辑,相信很多人都喜欢用 if-else/switch-case 去
- 1.通过看logcat下的日志2.通过adb命令3.通过写代码获取3.1写一个工具类打印系统时间3.2 在Application启动的时候打
- 记录一下在项目中用纯 YML(application.yml 或者 application.properties)文件、Java 代码配置
- using System.IO; using System.Collections;static string deleteComments
- 本文实例讲述了Java正则验证电话,手机,邮箱,日期,金额的方法。分享给大家供大家参考,具体如下:package com.hooypay.t
- 背景项目中用到了多数据源,不同的数据源根据业务不同配置在不同的工程中,由maven来统一聚合。但是前几天在开发过程中突然发现项目前台工程的事
- 泛型概述我们都知道集合中是可以存放任意对象的,只要把对象存储集合后,那么这时他们都会被提升成Object类型。当我们在取出每一个对象,并且进
- 在移动端,各个平台或 UI 系统的原始指针事件模型基本都是一致,即:一次完整的事件分为三个阶段:手指按下、手指移动、和手指抬起,而更高级别的
- 前言最近在做一个公共相关的内容,公告里边的内容,打算做成配置化的。但是考虑到存储到数据库,需要建立数据库表;存储到配置组件中,担心配置组件存
- 1.可见性通常,我们无法保证执行读操作的线程能看到其他线程写入的值,因为每个线程都由自己的缓存机制。为了确保多个线程之间对内存写入操作的可见
- 本文实例汇总了DevExpress SplitContainerControl的用法,希望对大家进行C#项目开发能起到一定的帮助作用。具体用
- 本总结我对于JAVA多线程中线程之间的通信方式的理解,主要以代码结合文字的方式来讨论线程间的通信,故摘抄了书中的一些示例代码,具体内容如下①
- 前言青空最近在逛一些社区的时候发现了有很多图片是像素图,感觉挺好玩的。正巧最近自己在学习JavaCV,所以在这里给大家演示一下如何使用Jav
- 最近做项目中遇到ToolBar因为不同的界面toobar不同为了描述统一的风格。相信大家也非常清楚,大多数ToolBar包括以下几个方面左标