kafka生产者发送消息流程深入分析讲解
作者:william_cr7 发布时间:2022-10-12 08:39:30
消息发送过程
消息的发送可能会经过 * 、序列化、分区器等过程。消息发送的主要涉及两个线程,分别为main线程和sender线程。
如图所示,主线程由 afkaProducer 创建消息,然后通过可能的 * 、序列化器和分区器的作用之后缓存到消息累加器RecordAccumulator (也称为消息收集器)中。 Sender 线程负责从RecordAccumulator 获取消息并将其发送到 Kafka中。
*
在消息序列化之前会经过消息 * ,自定义 * 需要实现ProducerInterceptor接口,接口主要有两个方案#onSend和#onAcknowledgement,在消息发送之前会调用前者方法,可以在发送之前假如处理逻辑,比如计费。在收到服务端ack响应后会触发后者方法。需要注意的是 * 中不要加入过多的复杂业务逻辑,以免影响发送效率。
消息分区
消息ProducerRecord会将消息路由到那个分区中,分两种情况:
1.指定了partition字段
如果消息ProducerRecord中指定了 partition字段,那么就不需要走分区器,直接发往指定得partition分区中。
2.没有指定partition,但自定义了分区器
3.没指定parittion,也没有自定义分区器,但key不为空
4.没指定parittion,也没有自定义分区器,key也为空
看源码
// KafkaProducer#partition
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
//指定分区partition则直接返回,否则走分区器
Integer partition = record.partition();
return partition != null ?
partition :
partitioner.partition(
record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
}
//DefaultPartitioner#partition
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
if (keyBytes == null) {
return stickyPartitionCache.partition(topic, cluster);
}
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
partition 方法中定义了分区分配逻辑 如果 ke 不为 null , 那 么默认的分区器会对 key 进行哈 希(采 MurmurHash2 算法 ,具备高运算性能及 低碰 撞率),最终根据得到 哈希值来 算分区号, 有相同 key 的消息会被写入同一个分区 如果 key null ,那么消息将会以轮询的方式发往主题内的各个可用分区。
消息累加器
分区确定好了之后,消息并不是直接发送给broker,因为一个个发送网络消耗太大,而是先缓存到消息累加器RecordAccumulator,RecordAccumulator主要用来缓存消息 Sender 线程可以批量发送,进 减少网络传输 的资源消耗以提升性能 RecordAccumulator 缓存的大 小可以通过生产者客户端参数 buffer memory 配置,默认值为 33554432B ,即 32MB如果生产者发送消息的速度超过发 送到服务器的速度 ,则会导致生产者空间不足,这个时候 KafkaProducer的send()方法调用要么 被阻塞,要么抛出异常,这个取决于参数 max block ms 的配置,此参数的默认值为 60秒。
消息累加器本质上是个ConcurrentMap,
ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;
发送流程源码分析
//KafkaProducer
@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
// intercept the record, which can be potentially modified; this method does not throw exceptions
//首先执行 * 链
ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
return doSend(interceptedRecord, callback);
}
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
TopicPartition tp = null;
try {
throwIfProducerClosed();
// first make sure the metadata for the topic is available
long nowMs = time.milliseconds();
ClusterAndWaitTime clusterAndWaitTime;
try {
clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), nowMs, maxBlockTimeMs);
} catch (KafkaException e) {
if (metadata.isClosed())
throw new KafkaException("Producer closed while send in progress", e);
throw e;
}
nowMs += clusterAndWaitTime.waitedOnMetadataMs;
long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
Cluster cluster = clusterAndWaitTime.cluster;
byte[] serializedKey;
try {
//key序列化
serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
" to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
" specified in key.serializer", cce);
}
byte[] serializedValue;
try {
//value序列化
serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
" to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
" specified in value.serializer", cce);
}
//获取分区partition
int partition = partition(record, serializedKey, serializedValue, cluster);
tp = new TopicPartition(record.topic(), partition);
setReadOnly(record.headers());
Header[] headers = record.headers().toArray();
//消息压缩
int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
compressionType, serializedKey, serializedValue, headers);
//判断消息是否超过最大允许大小,消息缓存空间是否已满
ensureValidRecordSize(serializedSize);
long timestamp = record.timestamp() == null ? nowMs : record.timestamp();
if (log.isTraceEnabled()) {
log.trace("Attempting to append record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
}
// producer callback will make sure to call both 'callback' and interceptor callback
Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);
if (transactionManager != null && transactionManager.isTransactional()) {
transactionManager.failIfNotReadyForSend();
}
//将消息缓存在消息累加器RecordAccumulator中
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs);
//开辟新的ProducerBatch
if (result.abortForNewBatch) {
int prevPartition = partition;
partitioner.onNewBatch(record.topic(), cluster, prevPartition);
partition = partition(record, serializedKey, serializedValue, cluster);
tp = new TopicPartition(record.topic(), partition);
if (log.isTraceEnabled()) {
log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", record.topic(), partition, prevPartition);
}
// producer callback will make sure to call both 'callback' and interceptor callback
interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);
result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs, false, nowMs);
}
if (transactionManager != null && transactionManager.isTransactional())
transactionManager.maybeAddPartitionToTransaction(tp);
//判断消息是否已满,唤醒sender线程进行发送消息
if (result.batchIsFull || result.newBatchCreated) {
log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
this.sender.wakeup();
}
return result.future;
// handling exceptions and record the errors;
// for API exceptions return them in the future,
// for other exceptions throw directly
} catch (Exception e) {
// we notify interceptor about all exceptions, since onSend is called before anything else in this method
this.interceptors.onSendError(record, tp, e);
throw e;
}
}
生产消息的可靠性
消息发送到broker,什么情况下生产者才确定消息写入成功了呢?ack是生产者一个重要的参数,它有三个值,ack=1表示leader副本写入成功服务端即可返回给生产者,是吞吐量和消息可靠性的平衡方案;ack=0表示生产者发送消息之后不需要等服务端响应,这种消息丢失风险最大;ack=-1表示生产者需要等等ISR中所有副本写入成功后才能收到响应,这种消息可靠性最高但吞吐量也是最小的。
来源:https://blog.csdn.net/CHINACR07/article/details/129699173


猜你喜欢
- 上篇文章说了通过RestTemplate实现微服务之间访问:https://www.jb51.net/article/252981.htm,
- 1、在线支付概述什么是在线支付呢?没错,就是在网上花钱!大家一定有过这样的经历。但是你可能不太了解在线支付的“内情”,下面我们来了解一下!如
- 1. 可变参数在JDK1.5之后,如果我们定义一个方法需要接受多个参数,并且多个参数类型一致,我们可以对其简化.格式:修饰符 返回值类型 方
- 1、导包,四大核心包,一个切面包(AOP),logging,web,springmvc2、配置文件,核心代码如下:web.xml<se
- 1、事务介绍事务(Transaction),一般是指要做的或所做的事情。在计算机术语中是指访问并可能更新数据库中各种数据项的一个程序执行单元
- 之前写轮播条或者指示器的时候都是UI图里面直接有,这样的效果并不好,给用户的体验比较差,所以闲暇之余自己写了个指示器,可以展现出一个优雅的效
- 1.for循环import com.google.common.base.Function;import com.google.common
- 1.spring配置文件<bean id="configproperties"  
- 1. 概述当一个对象的内在状态改变时允许改变其行为,这个对象看起来像是改变了其类。2. 解决的问题主要解决的是当控制一个对象状态转换的条件表
- 一、整体设计1、需求分析池化技术是计算机中的一种设计模式,内存池是常见的池化技术之一,它能够有效的提高内存的申请和释放效率以及内存碎片等问题
- 程序的结构分类:顺序结构:按照写代码的顺序 一次执行选择结构:根据条件的不同有选择的执行不同的代码循环结构:在一定条件下 反复执行某一片代码
- MD5加密在我们的程序中,不管是什么,都会有安全问题,今天就说的是MD5加密的方法MD5是哈希算法,也就是 从明文A到密文B很容易,但是从密
- 1.比较两个字符串时使用“==”还是equals()方法?当然是equals方法。“==”测试的是两个对象的引用是否相同,而equals()
- 前言在工作总常常需要用到缓存,而redis往往是首选,但是短期的数据缓存一般我们还是会用到本地缓存。本文提供一个我在工作中用到的缓存工具,该
- 您已经看到很多包含视频内容的应用程序,比如带有视频教程的食谱应用程序、电影应用程序和体育相关的应用程序。您是否想知道如何将视频内容添加到您的
- 1、java中启动exe程序 ,并添加传参String[] cmd = {"hh.exe","12315&qu
- 一、Jackson简介说明:本篇讲的是Jackson的详细用法,Jackson工具类在文章最后,直接复制粘贴即可使用。 Jackson是公司
- 本文讲述的是Android中RelativeLayout、FrameLayout的用法。具体如下:RelativeLayout是一个按照相对
- 创建新的项目的时候,文件名一直追加,不分层对于刚用idea的小白,这个问题困扰了我好几天了,幸好现在还不怎么敲代码,下面给一个详细的解决方案
- 如何快速判断一个元素是不是在一个集合里?这个题目是我最近面试的时候常问的一个问题,这个问题不同人都有很多不同的回答。今天想介绍一个很少有人会