elasticsearch节点的transport请求发送处理分析
作者:zziawan 发布时间:2022-04-05 22:38:08
标签:elasticsearch,transport,请求,发送
transport请求的发送和处理过程
前一篇分析对nettytransport的启动及连接,本篇主要分析transport请求的发送和处理过程。
cluster中各个节点之间需要相互发送很多信息,如master检测其它节点是否存在,node节点定期检测master节点是否存储,cluster状态的发布及搜索数据请求等等。为了保证信息传输,elasticsearch定义了一个19字节长度的信息头HEADER_SIZE = 2 + 4 + 8 + 1 + 4,以'E','S'开头,接着是4字节int信息长度,然后是8字节long型信息id,接着是一个字节的status,最后是4字节int型version。
所有的节点间的信息都是以这19个字节开始。同时elasticsearch对于节点间的所有action都定义 了名字,如对master的周期检测action,internal:discovery/zen/fd/master_ping,每个action对应着相应的messagehandler。接下来会进行详分析。
request的发送过程
代码在nettytransport中如下所示:
public void sendRequest(final DiscoveryNode node, final long requestId, final String action, final TransportRequest request, TransportRequestOptions options) throws IOException, TransportException {
//参数说明:node发送的目的节点,requestId请求id,action action名称,request请求,options包括以下几种操作 RECOVERY,BULK,REG,STATE,PING;
Channel targetChannel = nodeChannel(node, options);//获取对应节点的channel,channel在连接节点时初始化完成(请参考上一篇)
if (compress) {
options.withCompress(true);
}
byte status = 0;
//设置status 包括以下几种STATUS_REQRES = 1 << 0; STATUS_ERROR = 1 << 1; STATUS_COMPRESS = 1 << 2;
status = TransportStatus.setRequest(status);
ReleasableBytesStreamOutput bStream = new ReleasableBytesStreamOutput(bigArrays);//初始写出流
boolean addedReleaseListener = false;
try {
bStream.skip(NettyHeader.HEADER_SIZE);//留出message header的位置
StreamOutput stream = bStream;
// only compress if asked, and, the request is not bytes, since then only
// the header part is compressed, and the "body" can't be extracted as compressed
if (options.compress() && (!(request instanceof BytesTransportRequest))) {
status = TransportStatus.setCompress(status);
stream = CompressorFactory.defaultCompressor().streamOutput(stream);
}
stream = new HandlesStreamOutput(stream);
// we pick the smallest of the 2, to support both backward and forward compatibility
// note, this is the only place we need to do this, since from here on, we use the serialized version
// as the version to use also when the node receiving this request will send the response with
Version version = Version.smallest(this.version, node.version());
stream.setVersion(version);
stream.writeString(transportServiceAdapter.action(action, version));
ReleasableBytesReference bytes;
ChannelBuffer buffer;
// it might be nice to somehow generalize this optimization, maybe a smart "paged" bytes output
// that create paged channel buffers, but its tricky to know when to do it (where this option is
// more explicit).
if (request instanceof BytesTransportRequest) {
BytesTransportRequest bRequest = (BytesTransportRequest) request;
assert node.version().equals(bRequest.version());
bRequest.writeThin(stream);
stream.close();
bytes = bStream.bytes();
ChannelBuffer headerBuffer = bytes.toChannelBuffer();
ChannelBuffer contentBuffer = bRequest.bytes().toChannelBuffer();
buffer = ChannelBuffers.wrappedBuffer(NettyUtils.DEFAULT_GATHERING, headerBuffer, contentBuffer);
} else {
request.writeTo(stream);
stream.close();
bytes = bStream.bytes();
buffer = bytes.toChannelBuffer();
}
NettyHeader.writeHeader(buffer, requestId, status, version);//写信息头
ChannelFuture future = targetChannel.write(buffer);//写buffer同时获取future,发送信息发生在这里
ReleaseChannelFutureListener listener = new ReleaseChannelFutureListener(bytes);
future.addListener(listener);//添加listener
addedReleaseListener = true;
transportServiceAdapter.onRequestSent(node, requestId, action, request, options);
} finally {
if (!addedReleaseListener) {
Releasables.close(bStream.bytes());
}
}
}
来源:https://www.cnblogs.com/zziawanblog/p/6528616.html


猜你喜欢
- < drawable name="white">#FFFFFF< /drawable><
- 1、加载节点SpringBoot启动时,会执行这个方法:SpringApplication#run,这个方法中会调prepareContex
- 如果需要实现跨服务器上传文件,就是将我们本地的文件上传到资源服务器上,比较好的办法就是通过ftp上传。这里是结合SpringMVC+ftp的
- 曾经有一个朋友问过我一个问题, 一张512*512 150KB PNG格式图片和一张512*512 100KB 压缩比是8的JP
- 时间处理相关类:1.java.util.Date:时间类2.java.text.DateFormat:时间格式化类(抽象类),实现类:jav
- 本文实例讲述了Java线程之守护线程(Daemon)用法。分享给大家供大家参考。具体如下:守护线程(Daemon)Java有两种Thread
- 利用属性动画实现优酷菜单,供大家参考,具体内容如下布局文件<RelativeLayout xmlns:android="ht
- //创建站点地图 private void Create
- okio库的类结构okio 主要的接口和类okio接口和类的说明名称类型描述Source接口定义了输入流的几个基本方法BufferedSou
- Result也是Struts2比较重要的一部分,在Result的配置中常用的有四种类型:dispatcher、redirect、chain和
- foreach遍历LIST读到数据为null当我们在使用mybatis的时候,就避免不了批量更新,或者批量查询使用数组或者list,就避免不
- 合成聚合复用原则合成复用原则又称为组合/聚合复用原则(Composition/Aggregate Reuse Principle, CARP
- 前言一个说难不难,说简单竟看不出来是哪里问题的一个bug。是的 可能自己能力和经验尚浅无法识别,下面你们能否用火眼金睛一眼让bug原形毕露(
- 文件数据流在java语言中,进行文件输入和输出时,经常会使用到FileIntputStream和FileOutputStream两个文件数据
- 一、实体类转换成XML将实体类转换成XML需要使用XmlSerializer类的Serialize方法,将实体类序列化public stat
- Android版本更新实例详解1、导入xutils的jar包 2、在AndroidManifest.xml中添加权限 3、选择下载的路径,和
- 背景:本人不是Java开发人员,经过四年多的历练,可以说是一枚BI攻城师了吧,最近粗糙的写了一个Portal来集成cognos报表,下面就入
- 前几天在看一个cameraCTSbug时,结果在一个java for循环上有点蒙。正好赶上这个点总结一下。java中的控制结构:条件结构这里
- 异步log4j2的location信息打印问题背景:项目改造过程中将log4j2改成异步,发现行号没有打印,于是扒了下官方文档,大概陈述下:
- 本文实例为大家分享了Android利用Canvas类绘制图形的具体代码,供大家参考,具体内容如下首先介绍一下相关基础知识。1.画笔(pain