kafka的消息存储机制和原理分析
作者:zhangxm_qz 发布时间:2022-02-01 13:07:39
消息的保存路径
消息发送端发送消息到 broker 上以后,消息是如何持久化的?
数据分片
kafka 使用日志文件的方式来保存生产者和发送者的消息,每条消息都有一个 offset 值来表示它在分区中的偏移量。
Kafka 中存储的一般都是海量的消息数据,为了避免日志文件过大,一个分片 并不是直接对应在一个磁盘上的日志文件,而是对应磁盘上的一个目录,这个目录的命名规则是<topic_name>_<partition_id>。
比如创建一个名为firstTopic的topic,其中有3个partition,那么在 kafka 的数据目录(/tmp/kafka-log)中就有 3 个目录,firstTopic-0~3
多个分区在集群中多个broker上的分配方法
1.将所有 N Broker 和待分配的 i 个 Partition 排序
2.将第 i 个 Partition 分配到第(i mod n)个 Broker 上
log分段
每个分片目录中,kafka 通过分段的方式将 数据 分为多个 LogSegment,一个 LogSegment 对应磁盘上的一个日志文件(00000000000000000000.log)和一个索引文件(如上:00000000000000000000.index),其中日志文件是用来记录消息的。索引文件是用来保存消息的索引。
每个LogSegment 的大小可以在server.properties 中log.segment.bytes=107370 (设置分段大小,默认是1gb)选项进行设置。
segment 的 index file 和 data file 2 个文件一一对应,成对出现,后缀".index"和“.log”分别表示为 segment 索引文件、数据文件.命名规则:partion 全局的第一个 segment从 0 开始,后续每个 segment 文件名为上一个 segment文件最后一条消息的 offset 值进行递增。数值最大为 64 位long 大小,20 位数字字符长度,没有数字用 0 填充
第一个 log 文件的最后一个 offset 为:5376,所以下一个segment 的文件命名为: 0000000000000005376.log。
对应的 index 为 00000000000000005376.index
kafka 这种分片和分段策略,避免了数据量过大时,数据文件文件无限扩张带来的隐患,更有助于消息文件的维护以及被消费的消息的清理。
日志和索引文件内容分析
通过下面这条命令可以看到 kafka 消息日志的内容
sh kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafka-logs/test-0/00000000000000000000.log --print-data-log
输出结果为:
offset: 5376 position: 102124 CreateTime: 1531477349287isvalid: true keysize: -1 valuesize: 12 magic: 2compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: []payload: message_5376
可以看到一条消息,会包含很多的字段,如下:
offset: 5371 position: 102124 CreateTime: 1531477349286isvalid: true keysize: -1 valuesize: 12 magic: 2compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: []payload: message_5371
各字段的意义:
offset
:记录号 ;position
:偏移量;createTime
:创建时间、keysize
和valuesize
表示key
和value
的大小compresscodec
:表示压缩编码payload
:表示消息的具体内容
为了提高查找消息的性能,kafka为每一个日志文件添加 了2 个索引文件:OffsetIndex 和 TimeIndex,分别对应*.index以及*.timeindex, *.TimeIndex 是映射时间戳和相对 offset的文件
查看索引内容命令:
sh kafka-run-class.shkafka.tools.DumpLogSegments --files /tmp/kafka-logs/test-0/00000000000000000000.index --print-data-log
索引文件和日志文件内容关系如下
如上图所示,index 文件中存储了索引以及物理偏移量。
log 文件存储了消息的内容。
索引文件中保存了部分offset和偏移量position的对应关系。
比如 index文件中 [4053,80899],表示在 log 文件中,对应的是第 4053 条记录,物理偏移量(position)为 80899.
在 partition 中通过 offset 查找 message过程
根据 offset 的值,查找 segment 段中的 index 索引文件。由于索引文件命名是以上一个文件的最后一个offset 进行命名的,所以,使用二分查找算法能够根据offset 快速定位到指定的索引文件
找到索引文件后,根据 offset 进行定位,找到索引文件中的匹配范围的偏移量position。(kafka 采用稀疏索引的方式来提高查找性能)
得到 position 以后,再到对应的 log 文件中,从 position处开始查找 offset 对应的消息,将每条消息的 offset 与目标 offset 进行比较,直到找到消息
比如说,我们要查找 offset=2490 这条消息,那么先找到00000000000000000000.index, 然后找到[2487,49111]这个索引,再到 log 文件中,根据 49111 这个 position 开始查找,比较每条消息的 offset 是否大于等于 2490。最后查找到对应的消息以后返回
日志的清除策略以及压缩策略
日志的清理策略有两个
根据消息的保留时间,当消息在 kafka 中保存的时间超过了指定的时间,就会触发清理过程
根据 topic 存储的数据大小,当 topic 所占的日志文件大小大于一定的阀值,则可以开始删除最旧的消息。
通过 log.retention.bytes 和 log.retention.hours 这两个参数来设置,当其中任意一个达到要求,都会执行删除。默认的保留时间是:7 天
kafka会启动一个后台线程,定期检查是否存在可以删除的消息。
日志压缩策略
Kafka 还提供了“日志压缩(Log Compaction)”功能,通过这个功能可以有效的减少日志文件的大小,缓解磁盘紧张的情况,在很多实际场景中,消息的 key 和 value 的值之间的对应关系是不断变化的,就像数据库中的数据会不断被修改一样,消费者只关心 key 对应的最新的 value。
因此,我们可以开启 kafka 的日志压缩功能,服务端会在后台启动Cleaner线程池,定期将相同的key进行合并,只保留最新的 value 值。日志的压缩原理如下图:
消息写入的性能
顺序写
我们现在大部分企业仍然用的是机械结构的磁盘,如果把消息以随机的方式写入到磁盘,那么磁盘首先要做的就是寻址,也就是定位到数据所在的物理地址,在磁盘上就要找到对应的柱面、磁头以及对应的扇区;
这个过程相对内存来说会消耗大量时间,为了规避随机读写带来的时间消耗,kafka 采用顺序写的方式存储数据。
零拷贝
即使采用顺序写,但是频繁的 I/O 操作仍然会造成磁盘的性能瓶颈,所以 kafka还有一个性能策略:零拷贝
消息从发送到落地保存,broker 维护的消息日志本身就是文件目录,每个文件都是二进制保存,生产者和消费者使用相同的格式来处理。
在消费者获取消息时,服务器先从硬盘读取数据到内存,然后把内存中的数据原封不动的通过 socket 发送给消费者。
虽然这个操作描述起来很简单,但实际上经历了很多步骤。如下:
操作系统将数据从磁盘读入到内核空间的页缓存
应用程序将数据从内核空间读入到用户空间缓存中
应用程序将数据写回到内核空间到 socket 缓存中
操作系统将数据从 socket 缓冲区复制到网卡缓冲区,以便将数据经网络发出
这个过程涉及到 4 次上下文切换以及 4 次数据复制,并且有两次复制操作是由 CPU 完成。但是这个过程中,数据完全没有进行变化,仅仅是从磁盘复制到网卡缓冲区。通过“零拷贝”技术,可以去掉这些没必要的数据复制操作,同时也会减少上下文切换次数。
现代的 unix 操作系统提供一个优化的代码路径,用于将数据从页缓存传输到 socket;在 Linux 中,是通过 sendfile 系统调用来完成的。
Java 提供了访问这个系统调用的方法:FileChannel.transferTo API。
使用 sendfile,只需要一次拷贝就行,允许操作系统将数据直接从页缓存发送到网络上。
所以在这个优化的路径中,只有最后一步将数据拷贝到网卡缓存中是需要的
来源:https://blog.csdn.net/zhangxm_qz/article/details/87636094


猜你喜欢
- 如果想分析Android下 某个APP的网络数据交互,需要在Android手机上抓包,最常用的抓包工具非tcpdump莫属,用tcpdump
- 简单理解泛型泛型是Java SE 1.5的新特性,泛型的本质是参数化类型,也就是说所操作的数据类型被指定为一个参数。通俗点将就是“类型的变量
- SpringBoot2.x过后static下的静态资源无法访问package com.example.thymeleaf.commons;i
- 1. 为什么写这篇文章?事情是这样的,在 2021年6月10日早上我在CSDN上发布了文章《你真的懂Java怎么输出Hello World吗
- 开发环境: springboot + mybatis plus场景:在DAO的bean中有byte[]类时,写入可以成功,但是读取不行。从错
- Android独有的安全机制,除了权限机制外,另外一个就是签名机制了。签名机制主要用在以下两个主要场合起到其作用:升级App和权限检查。升级
- 本文实例为大家分享了Java实现简单日历界面的具体代码,供大家参考,具体内容如下请使用JFrame、JPanel、JButton、JLabe
- 用 Environment 类: &
- Java实现简单的类似QQ聊天工具,供大家参考,具体内容如下所使用到的知识点:java socket编程之TCP协议java Swing简单
- 在ios手机上经常看到页面上下滑动回弹效果,安卓中没有原生控件支持,这里自己就去自定义一个scrollview实现回弹效果1. 新建MySc
- 单例:Singleton,是指仅仅被实例化一次的类。饿汉单例设计模式一、饿汉设计模式public class SingletonHungry
- 栅栏类似闭锁,但是它们是有区别的.1.闭锁用来等待事件,而栅栏用于等待其他线程.什么意思呢?就是说闭锁用来等待的事件就是countDown事
- 通常浏览器都有将网页生成图片的功能,本文实例讲述了Winform实现将网页生成图片的方法。分享给大家供大家参考。具体方法如下:工具截图如下:
- 前言公司的邮件系统用的是 * 的 Lotus notes, 你敢信?最近要实现一个功能,邮件提醒功能,就是通过自动发送提醒邮件 前
- application.ymlspring: datasource: username: root password
- 我们在shader中对贴图处理时,有时候会有一些比较复杂的运算,比方说三角函数,开方等,一般情况下,如果可以在越上层做运算,性能会越高。C#
- 1:gc日志输出在jvm启动参数中加入 -XX:+PrintGC -XX:+PrintGCDetails -XX:+PrintGCTimes
- 网上汗牛充栋的文章都是介绍Android远程服务的,一个个将Binder机制、AIDL讲得头头是道,然而没有几个人能够给出清晰的范例说明如何
- 在网上找了半天,说的都没有解决我的问题,我自己花了点时间在idea中找到并解决了问题,希望可以帮助到大家。File---->setti
- 1、定义方法的格式 访问修饰符 返回类型 方法名 (参数列表) { // 方法的主体… }2、静态方法在大多数时候,我们定义写一个方法,会把