软件编程
位置:首页>> 软件编程>> java编程>> elasticsearch节点的transport请求发送处理分析

elasticsearch节点的transport请求发送处理分析

作者:zziawan  发布时间:2022-04-05 22:38:08 

标签:elasticsearch,transport,请求,发送

transport请求的发送和处理过程

前一篇分析对nettytransport的启动及连接,本篇主要分析transport请求的发送和处理过程。

cluster中各个节点之间需要相互发送很多信息,如master检测其它节点是否存在,node节点定期检测master节点是否存储,cluster状态的发布及搜索数据请求等等。为了保证信息传输,elasticsearch定义了一个19字节长度的信息头HEADER_SIZE = 2 + 4 + 8 + 1 + 4,以'E','S'开头,接着是4字节int信息长度,然后是8字节long型信息id,接着是一个字节的status,最后是4字节int型version。

所有的节点间的信息都是以这19个字节开始。同时elasticsearch对于节点间的所有action都定义 了名字,如对master的周期检测action,internal:discovery/zen/fd/master_ping,每个action对应着相应的messagehandler。接下来会进行详分析。

request的发送过程

代码在nettytransport中如下所示:

public void sendRequest(final DiscoveryNode node, final long requestId, final String action, final TransportRequest request, TransportRequestOptions options) throws IOException, TransportException {
       //参数说明:node发送的目的节点,requestId请求id,action action名称,request请求,options包括以下几种操作 RECOVERY,BULK,REG,STATE,PING;
Channel targetChannel = nodeChannel(node, options);//获取对应节点的channel,channel在连接节点时初始化完成(请参考上一篇)
       if (compress) {
           options.withCompress(true);
       }
       byte status = 0;
//设置status 包括以下几种STATUS_REQRES = 1 << 0; STATUS_ERROR = 1 << 1; STATUS_COMPRESS = 1 << 2;
status = TransportStatus.setRequest(status);
ReleasableBytesStreamOutput bStream = new ReleasableBytesStreamOutput(bigArrays);//初始写出流
       boolean addedReleaseListener = false;
       try {
           bStream.skip(NettyHeader.HEADER_SIZE);//留出message header的位置
           StreamOutput stream = bStream;
           // only compress if asked, and, the request is not bytes, since then only
           // the header part is compressed, and the "body" can't be extracted as compressed
           if (options.compress() && (!(request instanceof BytesTransportRequest))) {
               status = TransportStatus.setCompress(status);
               stream = CompressorFactory.defaultCompressor().streamOutput(stream);
           }
           stream = new HandlesStreamOutput(stream);
           // we pick the smallest of the 2, to support both backward and forward compatibility
           // note, this is the only place we need to do this, since from here on, we use the serialized version
           // as the version to use also when the node receiving this request will send the response with
           Version version = Version.smallest(this.version, node.version());
           stream.setVersion(version);
           stream.writeString(transportServiceAdapter.action(action, version));
           ReleasableBytesReference bytes;
           ChannelBuffer buffer;
           // it might be nice to somehow generalize this optimization, maybe a smart "paged" bytes output
           // that create paged channel buffers, but its tricky to know when to do it (where this option is
           // more explicit).
           if (request instanceof BytesTransportRequest) {
               BytesTransportRequest bRequest = (BytesTransportRequest) request;
               assert node.version().equals(bRequest.version());
               bRequest.writeThin(stream);
               stream.close();
               bytes = bStream.bytes();
               ChannelBuffer headerBuffer = bytes.toChannelBuffer();
               ChannelBuffer contentBuffer = bRequest.bytes().toChannelBuffer();
               buffer = ChannelBuffers.wrappedBuffer(NettyUtils.DEFAULT_GATHERING, headerBuffer, contentBuffer);
           } else {
               request.writeTo(stream);
               stream.close();
               bytes = bStream.bytes();
               buffer = bytes.toChannelBuffer();
           }
           NettyHeader.writeHeader(buffer, requestId, status, version);//写信息头
           ChannelFuture future = targetChannel.write(buffer);//写buffer同时获取future,发送信息发生在这里
           ReleaseChannelFutureListener listener = new ReleaseChannelFutureListener(bytes);
           future.addListener(listener);//添加listener
           addedReleaseListener = true;
           transportServiceAdapter.onRequestSent(node, requestId, action, request, options);
       } finally {
           if (!addedReleaseListener) {
               Releasables.close(bStream.bytes());
           }
       }
   }

来源:https://www.cnblogs.com/zziawanblog/p/6528616.html

0
投稿

猜你喜欢

手机版 软件编程 asp之家 www.aspxhome.com