Kafka Producer中的消息缓存模型图解详解
作者:石臻臻的杂货铺 发布时间:2022-05-03 06:00:13
前言
在阅读本文之前, 希望你可以思考一下下面几个问题, 带着问题去阅读文章会获得更好的效果。
发送消息的时候, 当Broker挂掉了,消息体还能写入到消息缓存中吗?
当消息还存储在缓存中的时候, 假如Producer客户端挂掉了,消息是不是就丢失了?
当最新的ProducerBatch还有空余的内存,但是接下来的一条消息很大,不足以加上上一个Batch中,会怎么办呢?
那么创建ProducerBatch的时候,应该分配多少的内存呢?
什么是消息累加器RecordAccumulator
kafka为了提高Producer客户端的发送吞吐量和提高性能,选择了将消息暂时缓存起来,等到满足一定的条件, 再进行批量发送, 这样可以减少网络请求,提高吞吐量。
而缓存这个消息的就是RecordAccumulator类.
上图就是整个消息存放的缓存模型,我们接下来一个个来讲解。
消息缓存模型
上图表示的就是 消息缓存的模型, 生产的消息就是暂时存放在这个里面。
每条消息,我们按照TopicPartition维度,把他们放在不同的Deque<ProducerBatch> 队列里面。
TopicPartition相同,会在相同Deque<ProducerBatch> 的里面。ProducerBatch : 表示同一个批次的消息, 消息真正发送到Broker端的时候都是按照批次来发送的,
这个批次可能包含一条或者多条消息。如果没有找到消息对应的ProducerBatch队列, 则创建一个队列。
找到ProducerBatch队列队尾的Batch,发现Batch还可以塞下这条消息,则将消息直接塞到这个Batch中
找到ProducerBatch队列队尾的Batch,发现Batch中剩余内存,不够塞下这条消息,则会创建新的Batch
当消息发送成功之后, Batch会被释放掉。
ProducerBatch的内存大小
那么创建ProducerBatch的时候,应该分配多少的内存呢?
先说结论: 当消息预估内存大于batch.size
的时候,则按照消息预估内存创建, 否则按照batch.size
的大小创建(默认16k).
我们来看一段代码,这段代码就是在创建ProducerBatch的时候预估内存的大小
RecordAccumulator#append
// 找到 batch.size 和 这条消息在batch中的总内存大小的 最大值
int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
// 申请内存
buffer = free.allocate(size, maxTimeToBlock);
假设当前生产了一条消息为M, 刚好消息M找不到可以存放消息的ProducerBatch(不存在或者满了),那么这个时候就需要创建一个新的ProducerBatch了
预估消息的大小 跟batch.size
默认大小16384(16kb). 对比,取最大值用于申请的内存大小的值。
那么, 这个消息的预估是如何预估的?纯粹的是消息体的大小吗?
DefaultRecordBatch#estimateBatchSizeUpperBound
预估需要的Batch大小,是一个预估值,因为没有考虑压缩算法从额外开销
/**
* 使用给定的键和值获取只有一条记录的批次大小的上限。
* 这只是一个估计,因为它没有考虑使用的压缩算法的额外开销。
**/
static int estimateBatchSizeUpperBound(ByteBuffer key, ByteBuffer value, Header[] headers) {
return RECORD_BATCH_OVERHEAD + DefaultRecord.recordSizeUpperBound(key, value, headers);
}
预估这个消息M的大小 + 一个RECORD_BATCH_OVERHEAD的大小
RECORD_BATCH_OVERHEAD是一个Batch里面的一些基本元信息,总共占用了 61B
消息M的大小也并不是单单的只有消息体的大小,总大小=(key,value,headers)的大小+MAX_RECORD_OVERHEAD
MAX_RECORD_OVERHEAD :一条消息头最大占用空间, 最大值为21B
也就是说创建一个ProducerBatch,最少就要83B .
比如我发送一条消息 " 1 " , 预估得到的大小是 86B, 跟batch.size(默认16384)
相比取最大值。 那么申请内存的时候取最大值 16384 。
关于Batch的结构和消息的结构,我们回头单独用一篇文章来讲解。
内存分配
我们都知道RecordAccumulator里面的缓存大小是一开始定义好的, 由buffer.memory
控制, 默认33554432 (32M)
当生产的速度大于发送速度的时候,就可能出现Producer写入阻塞。
而且频繁的创建和释放ProducerBatch,会导致频繁GC, 所有kafka中有个缓存池的概念,这个缓存池会被重复使用,但是只有固定( batch.size)的大小才能够使用缓存池。
PS:以下16k指得是 batch.size的默认值.
Batch的创建和释放
1. 内存16K 缓存池中有可用内存
①. 创建Batch的时候, 会去缓存池中,获取队首的一块内存ByteBuffer 使用。
②. 消息发送完成,释放Batch, 则会把这个ByteBuffer,放到缓存池的队尾中,并且调用ByteBuffer.clear
清空数据。以便下次重复使用
2. 内存16K 缓存池中无可用内存
①. 创建Batch的时候, 去非缓存池中的内存获取一部分内存用于创建Batch. 注意:这里说的获取内存给Batch, 其实就是让 非缓存池nonPooledAvailableMemory 减少 16K 的内存, 然后Batch正常创建就行了, 不要误以为好像真的发生了内存的转移。
②. 消息发送完成,释放Batch, 则会把这个ByteBuffer,放到缓存池的队尾中,并且调用ByteBuffer.clear
清空数据, 以便下次重复使用
3. 内存非16K 非缓存池中内存够用
①. 创建Batch的时候, 去非缓存池(nonPooledAvailableMemory)内存获取一部分内存用于创建Batch. 注意:这里说的获取内存给Batch, 其实就是让 非缓存池(nonPooledAvailableMemory) 减少对应的内存, 然后Batch正常创建就行了, 不要误以为好像真的发生了内存的转移。
②. 消息发送完成,释放Batch, 纯粹的是在非缓存池(nonPooledAvailableMemory)中加上刚刚释放的Batch内存大小。 当然这个Batch会被GC掉
4. 内存非16K 非缓存池内存不够用
①. 先尝试将 缓存池中的内存一个一个释放到 非缓存池中, 直到非缓存池中的内存够用与创建Batch了
②. 创建Batch的时候, 去非缓存池(nonPooledAvailableMemory)内存获取一部分内存用于创建Batch. 注意:这里说的获取内存给Batch, 其实就是让 非缓存池(nonPooledAvailableMemory) 减少对应的内存, 然后Batch正常创建就行了, 不要误以为好像真的发生了内存的转移。
③. 消息发送完成,释放Batch, 纯粹的是在非缓存池(nonPooledAvailableMemory)中加上刚刚释放的Batch内存大小。 当然这个Batch会被GC掉
例如: 下面我们需要创建 48k的batch, 因为超过了16k,所以需要在非缓存池中分配内存, 但是非缓存池中当前可用内存为0 , 分配不了, 这个时候就会尝试去 缓存池里面释放一部分内存到 非缓存池。
释放第一个ByteBuffer(16k) 不够,则继续释放第二个,直到释放了3个之后总共48k,发现内存这时候够了, 再去创建Batch。
注意:这里我们涉及到的 非缓存池中的内存分配, 仅仅指的的内存数字的增加和减少。
问题和答案
发送消息的时候, 当Broker挂掉了,消息体还能写入到消息缓存中吗?
当Broker挂掉了,Producer会提示下面的警告⚠️, 但是发送消息过程中
这个消息体还是可以写入到 消息缓存中的,也仅仅是写到到缓存中而已。
WARN [Producer clientId=console-producer] Connection to node 0 (/172.23.164.192:9090) could not be established. Broker may not be available
当最新的ProducerBatch还有空余的内存,但是接下来的一条消息很大,不足以加上上一个Batch中,会怎么办呢?
那么会创建新的ProducerBatch。
那么创建ProducerBatch的时候,应该分配多少的内存呢?
触发创建ProducerBatch的那条消息预估大小大于batch.size ,则以预估内存创建。
否则,以batch.size创建。
还有一个问题供大家思考:
当消息还存储在缓存中的时候, 假如Producer客户端挂掉了,消息是不是就丢失了?
来源:https://blog.csdn.net/u010634066/article/details/123376411


猜你喜欢
- springboot+mybatis整合过程中,开启控制台sql语句打印的多种方式:方法一1>(spring+mybatis)在myb
- Unsafe类介绍第一次看到这个类时被它的名字吓到了,居然还有一个类自名Unsafe?读完本文,大家也能发现Unsafe类确实有点不那么安全
- 1.关于JSR-303JSR-303规范(Bean Validation规范)提供了对 Java EE 和 Java SE 中的 Java
- 一、概述EventBus是一款针对Android优化的发布/订阅事件总线。主要功能是替代Intent,Handler,BroadCast在F
- 本文实例为大家分享了Android九宫格图片展示的具体代码,供大家参考,具体内容如下此篇博客为整理文章,供大家学习。1.首先下载common
- 本文实例为大家分享了Android实现层叠卡片式banner的具体代码,供大家参考,具体内容如下效果图如下:背景由于公司VIP模块项目需要,
- 在SpringBoot中,当需要获取到配置文件数据时,除了可以用Spring自带的@Value注解外,SpringBoot提供了一种更加方便
- 本Demo为练手小项目,主要是熟悉目前主流APP的架构模式.此项目中采用MVC设计模式,纯代码和少许XIB方式实现.主要实现了朋友圈功能和摇
- C# 利用代理爬虫网页实现代码:// yanggang@mimvp.com// http://proxy.mimvp.com// 2015-
- 现在就为大家介绍一种基于因子分解的RSA算法,这种加密算法有两种实现形式:1、公钥加密,私钥解密;2、私钥加密,公钥解密。下面就为大家分析一
- 本文实例讲述了java读取properties文件的方法。分享给大家供大家参考。具体实现方法如下:package com.test.demo
- 本文实例讲述了Java实现指定线程执行顺序的三种方式。分享给大家供大家参考,具体如下:方法一:通过共享对象锁加上可见变量来实现。public
- 前言最近在学习spring,抽空会将学习的知识总结下面,本文我们会接触spring 4的新功能:@Conditional注解。在之前的spr
- 结论先行Kotlin协程中的Channel用于处理多个数据组合的流,随用随取,时刻准备着,就像自来水一样,打开开关就有水了。Channel使
- 以前使用spring的使用要注入property要配置PropertyPlaceholder的bean对象。在springboot除&nbs
- Java 8中引入了CompletableFuture类,它是一种方便的异步编程工具,可以处理各种异步操作,如网络请求、文件IO和数据库操作
- 去掉DataTable中(所有列的数据都相同)重复的行: //DataView dv = dt3.DefaultView;&nbs
- ArrayBlockingQueue介绍ArrayBlockingQueue是数组实现的线程安全的有界的阻塞队列。线程安全是指,ArrayB
- 本文实例讲述了C#基于HttpWebRequest实现发送HTTP请求的方法。分享给大家供大家参考,具体如下:调用第三方API的时候要用到H
- 本文实例讲述了Java编程实现基于TCP协议的Socket聊天室。分享给大家供大家参考,具体如下:这里使用Socket套接字进行编程,完成的