RocketMQ消息过滤与查询的实现
作者:徘徊笔记(同公众号) 发布时间:2023-06-26 10:04:25
消息过滤
RocketMQ分布式消息队列的消息过滤方式有别于其它MQ中间件,是在Consumer端订阅消息时再做消息过滤的。
RocketMQ这么做是还是在于其Producer端写入消息和Consomer端订阅消息采用分离存储的机制来实现的,Consumer端订阅消息是需要通过ConsumeQueue这个消息消费的逻辑队列拿到一个索引,然后再从CommitLog里面读取真正的消息实体内容,所以说到底也是还绕不开其存储结构。
其ConsumeQueue的存储结构如下,可以看到其中有8个字节存储的Message Tag的哈希值,基于Tag的消息过滤正式基于这个字段值的。
主要支持如下2种的过滤方式
(1) Tag过滤方式:
Consumer端在订阅消息时除了指定Topic还可以指定TAG,如果一个消息有多个TAG,可以用||分隔。
其中,Consumer端会将这个订阅请求构建成一个 SubscriptionData,发送一个Pull消息的请求给Broker端。Broker端从RocketMQ的文件存储层—Store读取数据之前,会用这些数据先构建一个MessageFilter,然后传给Store。
Store从 ConsumeQueue读取到一条记录后,会用它记录的消息tag hash值去做过滤,由于在服务端只是根据hashcode进行判断,无法精确对tag原始字符串进行过滤,故在消息消费端拉取到消息后,还需要对消息的原始tag字符串进行比对,如果不同,则丢弃该消息,不进行消息消费。
(2) SQL92的过滤方式:
这种方式的大致做法和上面的Tag过滤方式一样,只是在Store层的具体过滤过程不太一样,真正的 SQL expression 的构建和执行由rocketmq-filter模块负责的。
每次过滤都去执行SQL表达式会影响效率,所以RocketMQ使用了BloomFilter避免了每次都去执行。SQL92的表达式上下文为消息的属性。
消息查询
RocketMQ支持按照下面两种维度(“按照Message Id查询消息”、“按照Message Key查询消息”)进行消息查询。
按照MessageId查询消息
RocketMQ中的MessageId的长度总共有16字节,其中包含了消息存储主机地址(IP地址和端口),消息Commit Log offset。
“按照MessageId查询消息”在RocketMQ中具体做法是:Client端从MessageId中解析出Broker的地址(IP地址和端口)和Commit Log的偏移地址后封装成一个RPC请求后通过Remoting通信层发送(业务请求码:VIEW_MESSAGE_BY_ID)。
Broker端走的是QueryMessageProcessor,读取消息的过程用其中的 commitLog offset 和 size 去 commitLog 中找到真正的记录并解析成一个完整的消息返回。
按照Message Key查询消息
“按照Message Key查询消息”,主要是基于RocketMQ的IndexFile索引文件来实现的。RocketMQ的索引文件逻辑结构,类似JDK中HashMap的实现。
索引文件的具体结构如下:
IndexFile索引文件为用户提供通过“按照Message Key查询消息”的消息索引查询服务,IndexFile文件的存储位置是:$HOME\store\index\${fileName},文件名fileName是以创建时的时间戳命名的,文件大小是固定的,等于40+500W\*4+2000W\*20= 420000040个字节大小。
如果消息的properties中设置了UNIQ_KEY这个属性,就用 topic + “#” + UNIQ_KEY的value作为 key 来做写入操作。
如果消息设置了KEYS属性(多个KEY以空格分隔),也会用 topic + “#” + KEY 来做索引。
其中的索引数据包含了Key Hash/CommitLog Offset/Timestamp/NextIndex offset 这四个字段,一共20 Byte。
NextIndex offset 即前面读出来的 slotValue,如果有 hash冲突,就可以用这个字段将所有冲突的索引用链表的方式串起来了。
Timestamp记录的是消息storeTimestamp之间的差,并不是一个绝对的时间。整个Index File的结构如图,40 Byte 的Header用于保存一些总的统计信息,4\*500W的 Slot Table并不保存真正的索引数据,而是保存每个槽位对应的单向链表的头。
20\*2000W 是真正的索引数据,即一个 Index File 可以保存 2000W个索引。
“按照Message Key查询消息”的方式,RocketMQ的具体做法是,主要通过Broker端的QueryMessageProcessor业务处理器来查询,读取消息的过程就是用topic和key找到IndexFile索引文件中的一条记录,根据其中的commitLog offset从CommitLog文件中读取消息的实体内容。
来源:https://blog.csdn.net/ph3636/article/details/89066990
猜你喜欢
- 在使用JDBC的时候,数据库据连接是非常宝贵的资源。为了复用这些资源,可以将连接保存在一个队列中。当需要的时候可以从队列中取出未使用的连接。
- 一.为什么要用线程池先来看个简单的例子1.直接new Thread的情况:public static void main(String[]
- SingleClick:@Retention(AnnotationRetention.RUNTIME)@Target(AnnotationT
- 一、前言环境:jdk 1.8,SpringCloud Greenwich.SR2。如题,springcloud config-client启
- 前言Spark Sql可以通过UDF来对DataFrame的Column进行自定义操作。在特定场景下定义UDF可能需要用到Spark Con
- 一、前言无论承接什么样的需求,是不是身边总有那么几个人代码写的烂,但是却时常有测试小姐姐过来聊天(求改bug)、有产品小伙伴送吃的(求写需求
- 此项目使用了OpenCVSharp加载本地摄像头,多个摄像头支持切换展示,也可以展示rtsp地址。使用NuGet如下:代码如下一、创建Mai
- Springboot 内置tomcat禁止不安全HTTP方法1、在tomcat的web.xml中可以配置如下内容让tomcat禁止不安全的H
- 一、概述本质上,这是一个包含有可选值的包装类,这意味着 Optional 类既可以含有对象也可以为空。Optional 是 Java 实现函
- Spring 是一个开源框架,是为了解决企业应用程序开发复杂性而创建的。框架的主要优势之一就是其分层架构,分层架构允许您选择使用哪一个组件,
- 一个Maprduce程序主要包括三部分:Mapper类、Reducer类、执行类。Maven项目下所需依赖<dependencies&
- JAVA中实现pdf转图片可以通过第三方提供的架包,这里介绍几种常用的,可以根据自身需求选择使用。一、icepdf。有收费版和开源版,几种方
- ContentProvider是内容提供者,可以跨进程提供数据。大家都知道,ContentProvider的启动,是在Application
- 1、异常分类通常分为三类:系统异常(SystemException),业务异常(BusinessException)和其他异常(Except
- 一、前言对于任何框架而言,在使用前都要进行一系列的初始化,MyBatis也不例外。二、MyBatis的初始化做了什么2.1 Mybatis的
- 前序(先序)遍历中序遍历后续遍历层序遍历如图二叉树:二叉树结点结构public class TreeNode { int val
- 代码如下import java.util.concurrent.Callable;import java.util.concurrent.E
- C++对string进行大小写转换操作方法方法一:使用C语言之前的方法,使用函数,进行转换#include <iostream>
- 一、技术介绍1.chatgpt-java是一个OpenAI的Java版SDK,支持开箱即用。目前以支持官网全部Api。支持最新版本GPT-3
- 第一次写上传图片的代码,碰到很多问题。昨天做了整整一天,终于在晚上的时候成功了。大声欢呼。但是,做完之后,还是有很多问题想不通。所以在这里也