Netty分布式pipeline管道传播outBound事件源码解析
作者:向南是个万人迷 发布时间:2022-10-17 23:43:06
了解了inbound事件的传播过程, 对于学习outbound事件传输的流程, 也不会太困难
outbound事件传输流程
在我们业务代码中, 有可能使用wirte方法往写数据:
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.channel().write("test data");
}
当然, 直接调用write方法是不能往对方channel中写入数据的, 因为这种方式只能写入到缓冲区, 还要调用flush方法才能将缓冲区数据刷到channel中, 或者直接调用writeAndFlush方法, 有关逻辑, 我们会在后面章节中详细讲解, 这里只是以wirte方法为例为了演示outbound事件的传播的流程
这里我们同样给出两种写法
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//写法1
ctx.channel().write("test data");
//写法2
ctx.write("test data");
}
这两种写法有什么区别, 我们首先跟到第一种写法中去:
ctx.channel().write("test data");
这里获取ctx所绑定的channel
我们跟到AbstractChannel的write方法中:
public ChannelFuture write(Object msg) {
return pipeline.write(msg);
}
这里pipeline是DefaultChannelPipeline
跟到其write方法中:
public final ChannelFuture write(Object msg) {
//从tail节点开始(从最后的节点往前写)
return tail.write(msg);
}
这里调用tail节点write方法, 这里我们应该能分析到, outbound事件, 是通过tail节点开始往上传播的, 带着这点猜想, 我们继往下看
其实tail节点并没有重写write方法, 最终会调用其父类AbstractChannelHandlerContext的write方法
AbstractChannelHandlerContext的write方法:
public ChannelFuture write(Object msg) {
return write(msg, newPromise());
}
我们看到这里有个newPromise()这个方法, 这里是创建一个Promise对象, 有关Promise的相关知识我们会在以后的章节剖析
我们继续跟write:
public ChannelFuture write(final Object msg, final ChannelPromise promise) {
//代码省略
write(msg, false, promise);
return promise;
}
继续跟write:
private void write(Object msg, boolean flush, ChannelPromise promise) {
AbstractChannelHandlerContext next = findContextOutbound();
final Object m = pipeline.touch(msg, next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
if (flush) {
next.invokeWriteAndFlush(m, promise);
} else {
//没有调flush
next.invokeWrite(m, promise);
}
} else {
AbstractWriteTask task;
if (flush) {
task = WriteAndFlushTask.newInstance(next, m, promise);
} else {
task = WriteTask.newInstance(next, m, promise);
}
safeExecute(executor, task, promise, m);
}
}
这里跟我们上一小节剖析过channelRead方法有点类似, 但是事件传输的方向有所不同, 这里findContextOutbound()是获取上一个标注outbound事件的HandlerContext
跟到findContextOutbound中
private AbstractChannelHandlerContext findContextOutbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.prev;
} while (!ctx.outbound);
return ctx;
}
这里的逻辑我们似曾相识, 跟我们上一小节的findContextInbound()方法有点像, 只是过程是反过来的
在这里, 会找到当前context的上一个节点, 如果标注的事件不是outbound事件, 则继续往上找, 意思就是找到上一个标注outbound事件的节点
回到write方法:
AbstractChannelHandlerContext next = findContextOutbound();
这里将找到节点赋值到next属性中
因为我们之前分析的write事件是从tail节点传播的, 所以上一个节点就有可能是用户自定的handler所属的context
然后判断是否为当前eventLoop线程, 如果是不是, 则封装成task异步执行, 如果不是, 则继续判断是否调用了flush方法, 因为我们这里没有调用, 所以会执行到next.invokeWrite(m, promise),
我们继续跟invokeWrite
private void invokeWrite(Object msg, ChannelPromise promise) {
if (invokeHandler()) {
invokeWrite0(msg, promise);
} else {
write(msg, promise);
}
}
这里会判断当前handler的状态是否是添加状态, 这里返回的是true, 将会走到invokeWrite0(msg, promise)这一步
继续跟invokeWrite0
private void invokeWrite0(Object msg, ChannelPromise promise) {
try {
//调用当前handler的wirte()方法
((ChannelOutboundHandler) handler()).write(this, msg, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
}
这里的逻辑也似曾相识, 调用了当前节点包装的handler的write方法, 如果用户没有重写write方法, 则会交给其父类处理
我们跟到ChannelOutboundHandlerAdapter的write方法中看:
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
ctx.write(msg, promise);
}
这里调用了当前ctx的write方法, 这种写法和我们小节开始的写法是相同的, 我们回顾一下:
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//写法1
ctx.channel().write("test data");
//写法2
ctx.write("test data");
}
我们跟到其write方法中, 这里走到的是AbstractChannelHandlerContext类的write方法:
private void write(Object msg, boolean flush, ChannelPromise promise) {
AbstractChannelHandlerContext next = findContextOutbound();
final Object m = pipeline.touch(msg, next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
if (flush) {
next.invokeWriteAndFlush(m, promise);
} else {
//没有调flush
next.invokeWrite(m, promise);
}
} else {
AbstractWriteTask task;
if (flush) {
task = WriteAndFlushTask.newInstance(next, m, promise);
} else {
task = WriteTask.newInstance(next, m, promise);
}
safeExecute(executor, task, promise, m);
}
}
又是我们所熟悉逻辑, 找到当前节点的上一个标注事件为outbound事件的节点, 继续执行invokeWrite方法, 根据之前的剖析, 我们知道最终会执行到上一个handler的write方法中
走到这里已经不难理解, ctx.channel().write("test data")其实是从tail节点开始传播写事件, 而ctx.write("test data")是从自身开始传播写事件
所以, 在handler中如果重写了write方法要传递write事件, 一定采用ctx.write("test data")这种方式或者交给其父类处理处理, 而不能采用ctx.channel().write("test data")这种方式, 因为会造成每次事件传输到这里都会从tail节点重新传输, 导致不可预知的错误
如果用代码中没有重写handler的write方法, 则事件会一直往上传输, 当传输完所有的outbound节点之后, 最后会走到head节点的wirte方法中
我们跟到HeadContext的write方法中
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
unsafe.write(msg, promise);
}
我们看到write事件最终会流向这里, 通过unsafe对象进行最终的写操作
有关inbound事件和outbound事件的传输, 可通过下图进行说明:
来源:https://www.cnblogs.com/xiangnan6122/p/10204459.html


猜你喜欢
- 笔者最近需要上位机与下位机进行数据交互,在广泛参考大佬的资料后,较为完善地使用Textbox控件进行数据输入的功能。程序段主要功能:实现输入
- 目录绘制流程Measure 测量流程MeasureSpeclayout 布局流程draw 绘制流程相关类 & 概念DecorView
- 二、简介多线程技术主要解决处理器单元内多个线程执行的问题,它可以显著减少处理器单元的闲置时间,增加处理器单元的吞吐能力,但频繁的创建线程的开
- Semaphore也是一个同步器,和前面两篇说的CountDownLatch和CyclicBarrier不同,这是递增的,初始化的时候可以指
- DataSource在数据库应用中,客户端与数据库服务端建立的连接对象(Connection)是宝贵的资源,每次请求数据库都创建连接,使用完
- java中实现多线程的方法有两种:继承Thread类和实现runnable接口。1.继承Thread类,重写父类run()方法 public
- Java序列化JSON时long型数值,会出现精度丢失的问题。原因:java中得long能表示的范围比js中number大,也就意味着部分数
- 前言工作中使用mybatis时我们需要根据数据表字段创建pojo类、mapper文件以及dao类,并且需要配置它们之间的依赖关系,这样的工作
- 先来回忆下在mybatis中的resultMap作用和是什么resultMap的作用是什么在使用传统的mybatis时,我们一般都会在xml
- 目录一、环境搭建二、RetryTemplate2.1 RetryTemplate2.2 RetryListener2.3 回退策略2.3.1
- 1.SpringBoot AOP功能1.1 LTW与不同的切面织入时机AOP——面向切面编程,通过为
- 一、概述1、事务ACID特性事务将一系列的工作视为一个工作单元,它具有 ACID 特性:A:Atomicity 不可分性 也就是说
- HashTable和HashMap区别第一,继承的父类不同。Hashtable继承自Dictionary类,而HashMap继承自Abstr
- 一、MyBatis的增删改查1.1、新增<!--int insertUser();--><insert id="
- 1、MediaCodec调用流程首先,我们先看下MediaCodec::CreateByType函数里面做了什么:sp<MediaCo
- 本文是针对AndBase框架学习整理的第二篇笔记,想要了解AndBase框架的朋友可以阅读本文,大家共同学习。使用AbActivity内部封
- 在最近写的一个天气APP中用到了圆形头像这样的一个样式,中间是圆形的头像(被圆形切割的图片),周围是一个带颜色的圆环。如下图所示,今天就来说
- 根据约定,在使用java编程的时候应尽可能的使用现有的类库,当然你也可以自己编写一个排序的方法,或者框架,但是有几个人能写得比JDK里的还要
- 使用Spring Boot 与Dubbo集成,这里我之前尝试了使用注解的方式,简单的使用注解注册服务其实是没有问题的,但是当你涉及到使用注解
- 在日常的开发中、我们都知道,Java的内存清理是通过垃圾回收器进行的,那么其是如何将没用的对象被被清理掉的呢?Java 语言的内存自动回收称