详解kafka中的消息分区分配算法
作者:温故知新之java 发布时间:2021-06-02 08:16:15
背景
kafka有分区机制,一个主题topic在创建的时候,会设置分区。如果只有一个分区,那所有的消费者都订阅的是这一个分区消息;如果有多个分区的话,那消费者之间又是如何分配的呢?
分配算法
RangeAssignor
定义
Kafka默认采⽤RangeAssignor的分配算法。
RangeAssignor策略的原理是按照消费者总数和分区总数进⾏整除运算来获得⼀个跨度,然 后将分区按照跨度进⾏平均分配,以保证分区尽可能均匀地分配给所有的消费者。对于每⼀个 Topic,RangeAssignor策略会将消费组内所有订阅这个Topic的消费者按照名称的字典序排序,然 后为每个消费者划分固定的分区范围,如果不够平均分配,那么字典序靠前的消费者会被多分配 ⼀个分区。
这种分配⽅式明显的⼀个问题是随着消费者订阅的Topic的数量的增加,不均衡的问题会越来 越严重,⽐如上图中4个分区3个消费者的场景,C0会多分配⼀个分区。如果此时再订阅⼀个分区 数为4的Topic,那么C0⼜会⽐C1、C2多分配⼀个分区,这样C0总共就⽐C1、C2多分配两个分区 了,⽽且随着Topic的增加,这个情况会越来越严重。
源码分析
public class RangeAssignor extends AbstractPartitionAssignor {
....
@Override
public Map> assign(Map partitionsPerTopic, Map subscriptions) {
// 1. 获取每个topic被多少个consumer订阅了
Map<String,List<String>> consumersPerTopic = consumersPerTopic(subscriptions);
// 2. 存储最终的分配⽅案
Map<String,List<String>> assignment = new HashMap<>();
for (String memberId : subscriptions.keySet())
assignment.put(memberId, new ArrayList());
for (Map.Entry> topicEntry : consumersPerTopic.entrySet()) {
String topic = topicEntry.getKey();
List consumersForTopic = topicEntry.getValue();
// 3. 每个topic的partition数量
Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
if (numPartitionsForTopic == null)
continue;
Collections.sort(consumersForTopic);
// 4. 表示平均每个consumer会分配到多少个partition
int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size();
// 5. 平均分配后还剩下多少个partition未被分配
int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size();
List partitions = AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic);
// 6. 这⾥是关键点,分配原则是将未能被平均分配的partition分配到前 consumersWithExtraPartition个consumer
for (int i = 0, n = consumersForTopic.size(); i < n; i++) {
int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition);
int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1); assignment.get(consumersForTopic.get(i)).addAll(partitions.subList(start, start + length));
}
}
return assignment;
}
}
场景
可以完全平均分配
无法完全平均分配,排序靠前分的更多
消费者数量大于分区数量,排名靠前先分得,排名靠后未分得分区
RoundRobinAssignor
定义
RoundRobinAssignor的分配策略是将消费组内订阅的所有Topic的分区及所有消费者进⾏排序后尽 量均衡的分配(RangeAssignor是针对单个Topic的分区进⾏排序分配的)。如果消费组内,消费者订阅 的Topic列表是相同的(每个消费者都订阅了相同的Topic),那么分配结果是尽量均衡的(消费者之间 分配到的分区数的差值不会超过1)。
源码分析
package org.apache.kafka.clients.consumer;
public class RoundRobinAssignor extends AbstractPartitionAssignor {
@Override
public Map> assign(Map partitionsPerTopic, Map subscriptions) {
<Map> assignment = new HashMap<>();
for (String memberId : subscriptions.keySet()) assignment.put(memberId, new ArrayList()); // 1. 环状链表,存储所有的consumer,⼀次迭代完之后⼜会回到原点
CircularIterator assigner = new CircularIterator<> (Utils.sorted(subscriptions.keySet())); // 2. 获取所有订阅的topic的partition总数 for (TopicPartition partition : allPartitionsSorted(partitionsPerTopic, subscriptions)) {
final String topic = partition.topic();
while (!subscriptions.get(assigner.peek()).topics().contains(topic))
assigner.next();
assignment.get(assigner.next()).add(partition);
}
return assignment;
}
.... }
场景
无法完全平均分配,排序靠前分的更多
StickyAssignor
定义
尽管RoundRobinAssignor已经在RangeAssignor上做了⼀些优化来更均衡的分配分区,但是在⼀些情况下依旧会产⽣严重的分配偏差,从字⾯意义上看,Sticky是“粘性的”,可以理解为分配结果是带“粘性的”——每⼀次分配变更相对 上⼀次分配做最少的变动(上⼀次的结果是有粘性的) 其⽬标有两点:
分区的分配尽量的均衡
每⼀次重分配的结果尽量与上⼀次分配结果保持⼀致
场景
来源:https://juejin.cn/post/7086656375685660708
猜你喜欢
- 前言:在Java面试中,线程的状态也是被经常考察的知识点,今天我们就来聊一聊线程状态的那些事!线程在JVM中的状态查看线程在JVM中有哪些不
- 1、static是什么意思?static 关键字表明一个成员变量或者是成员方法可以在没有所属的类的实例变量的情况下被访问。例如Main类pa
- springboot整合mybatis实现数据库更新批处理1.在mapper接口中编写方法/** * 修改book表中的销量和库存
- 本文汇总了Spring的常用注解,以方便大家查询和使用,具体如下:使用注解之前要开启自动扫描功能其中base-package为需要扫描的包(
- 1、java.util.concurrent.atomic 的包里有AtomicBoolean, AtomicInteger,AtomicL
- 动态方法就是一个Action对应多个请求,减少Action的数量1、指定method属性<action name="addA
- Java 异常的栈轨迹(Stack Trace)详解 捕获到异常时,往往需要进行一些处理。比较简单直接的
- 经过很多查看在巨人的肩膀上写完这篇博客,如有雷同纯属巧合,虽然自己也查了些文章才总结的,但是站在巨人肩膀上不敢搞原创!学习使用一些插件,可以
- 由于公司的开发团队偏向于使用Java技术,而且公司倡导学习开源技术,所以我选择用Java语言来进行Selenium WebDriver的自动
- 1、一个示例回顾Future一些业务场景我们需要使用多线程异步执行任务,加快任务执行速度。JDK5新增了Future接口,用于描述一个异步计
- 博主说:有时候,我们需要对数据库中现有的数据进行大量处理操作(例如表中的某个字段需要全部更新等),如果直接使用select * from t
- 1、研究背景 在当今信息社会发展中中,计算机科
- 属性问题引入前面我们已经在父工程中的dependencyManagement标签中对项目中所使用的jar包版本进行了统一的管理,但是如果在标
- 前言:IO流主要分为两大类,分别是字节流与字符流注意:1、音频文件、图片、视频(范围广),就用字节流2、只涉及到文本的,就用字符流使用字节流
- 前言之前一篇文章介绍了基本的统一异常处理思路: Spring MVC/Boot 统一异常处理最佳实践.上篇文章也有许多人提出了一些问题:如何
- 在我们的服务中不可避免的需要使用到一些秘钥(数据库、redis等)开发和测试环境还好,但生产如果采用明文配置讲会有安全问题,jasypt是一
- 最好使用英文,不要用汉语拼音1:包(package):用于将完成不同功能的类分门别类,放在不同的目录(包)下,包的命名规则:将公司域名反转作
- 前言这几天同事跟我在升级Android target SDK和build tool版本的时候,碰到了一个非常搞笑的问题,基本可以算作是“坑”
- 配置Servlet的方法有俩种,分别是传统web.xml文档中部署servlet和注解方式部署servlet,下面就先一起来学习 * 解方式部
- 前言Mybatis作为一个应用广泛的优秀的ORM框架,已经成了JavaWeb世界近乎标配的部分,这个框架具有强大的灵活性,在四大组件(Exe