Java Kafka分区发送及消费实战
作者:热黄油啤酒 发布时间:2022-08-17 20:27:58
前言
Kafka是现在非常热门的分布式消息队列,常用于微服务间异步通信,业务解耦等场景。kafka的性能非常强大,但是单个微服务吞吐性能是有上限的,我们就会用到分布式微服务,多消费者多生产者进行数据处理,保证性能同时也能根据业务量进行横向拓展,对于同一个微服务的多个实例,输入输出的topic是同一个,这时候我们就可以利用Kafka分区消费来解决这个问题。
业务场景
我们开发的是一个物联网系统,大量设备接入到平台实时发送数据,有秒级数据和分钟级别数据等等,处理流程包含接入、处理、存储,这三个模块间就是使用kafka进行数据流转,数据处理模块中包含多个微服务,单条数据会经历多次处理,部分业务耗时较长,导致在高频率接收到数据时候单体服务无法达到吞吐平衡,于是对于这些服务进行了分布式部署,多个实例进行消费处理。
业务实现
不指定分区
我们在给kafka发送消息时候,如果不指定分区,是不需要手动创建topic的,发送时没有topic,kafka会自动创建一个分区为1的topic,如下:
@Service
public class ProductService {
@Autowired
private KafkaTemplate kafkaTemplate;
public void send(String msg, String topic) {
kafkaTemplate.send(topic, msg);
}
}
指定分区
topic分区初始化及配置
指定分区发送时候,如果未配置topic分区数,指定>0的分区,会提示分区不存在,这时候我们就需要提前创建好topic及分区
手动创建,服务启动前,使用kafka tool手动创建topic 不推荐 x
自动创建,服务启动时,使用KafkaClient创建 推荐 √
/**
* 初始化多分区的topic 基于springboot2
*/
@Component
public void TopicInitRunner implements ApplicationRunner {
@Autowired
private AdminClient adminClient;
@Override
public void run(ApplicationArguments args) throws Exception {
// 通过配置文件读取自定义配置的topic名及分区数 省略...
// Key topic V 分区数
Map<String, Integer> topicPartitionMap = new HashMap<>();
for (Map.Entry<String, Integer> e : topicPartitionMap.entrySet()) {
createTopic(e.getKey(), e.getValue());
}
}
public void createTopic(String topic, int partition) {
NewTopic newTopic = new NewTopic(topic, partition);
adminClient.createTopics(Lists.newArrayList(newTopic));
}
}
/**
* 配置类参考 基于springboot2
* 如果只进行普通的单消息发送 无需添加此配置到项目中
*/
@Configuration
public class KafkaConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String servers;
@Bean
public AdminClient adminClient() {
return AdminClient.create(kafkaAdmin().getConfig());
}
@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> props = Maps.newHashMap();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
return new KafkaAdmin(props);
}
}
生产者分区发送方案
上面讲到如何初始化分区topic,这时候我们的kafka环境已经准备好了,我们先使用TopicInitRunner为我们创建一个名称为 partition-topic 分区数为三,现在讲一讲如何均匀的讲消息发送的每个分区上,如何保证多消费者实例是负载均衡的,具体方案如下:
1.因为每条消息都是设备上传的,都会有设备id,先给每个设备生成一个自增号,这样1000个设备,每个设备就会有0到999的自增号,放到缓存中,每次根据消息中的设备id获取到该设备的自增号
2.使用自增号对分区数进行取模操作,代码实现如下:
public class ProductService {
/**
* data为需要发送的数据
*/
public void partitionSend(String topic, int partition, JSONObject data) {
// 获取设备id
String deviceId = data.getString("deviceId");
// 获取自增数 如果是新设备会创建一个并放入缓存中
int inc = getDeviceInc(deviceId);
// 如果分区数为3 设备自增id为1 取模结果为1 就是发送到1分区 这样1000个设备就可以保证每个分区发送数据量是1000 / 3
int targetPartition = Math.floorMod(inc, partition);
// 分区发送时候 需要指定一个唯一k 可以使用uuid或者百度提供的雪花算法获取id 字符串即可
kafkaTemplate.send(topic, partition, getUuid(), data.toJSONString());
}
}
消费者
我们讲到消费者使用分布式部署,一个微服务有多个实例,我们只需要按照服务监听的topic分区数创建对应数目的服务实例即可,这样kafka就会自动分配对应分区的数据到每个实例。
我们采取批量消费,进一步提高服务吞吐性能,消费及配置代码如下,配置文件参考springbootkafka配置即可,主要设计kafka服务配置,消费及生产配置,比较核心的是
@Component
public class DataListener {
@Autowired
private MongoTemplate mongoTemplate;
/**
* 站点报文监听消费
*
* @param records
*/
@KafkaListener(topics = "partition-topic", containerFactory = "batchConsumerFactory")
public void iotSiteHistoryMessageConsumer(List<ConsumerRecord<String, String>> records) {
}
/**
* 消费者配置
*/
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = Maps.newHashMap();
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000);
props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
/**
* 批量消费配置
*/
@Bean
public KafkaListenerContainerFactory batchConsumerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
factory.setBatchListener(true);
return factory;
}
}
来源:https://juejin.cn/post/6995746569580445709


猜你喜欢
- import java.util.LinkedList;public class OJ { public OJ() {
- C#生成指定范围内的不重复随机数// Number随机数个数// minNum随机数下限// maxNum随机数上限public int[]
- 前言我们在日常开发中,经常会用到一个系统需要链接多个数据库来实现业务的需求,比如多个系统之间数据调用、两个数据之间同步等等。今天给大家分享使
- @EventListener 异步中使用condition的问题@EventListener是spring在4.2+推出的更好的使用spri
- 本文实例讲述了C++求四个正整数最大公约数的方法。分享给大家供大家参考,具体如下:/** 作 者: 刘同宾* 完成日期:2012 年 11
- 本文先为大家介绍如何利用缓存Cache方便地实现此功能。 Cache与Session这二个状态对像的其中有一个不同之处,Cache是一个全局
- Android 7.0调用相机崩溃解决办法 错误提示:android.os.FileUriExposedException: fi
- springboot静态资源加载顺序优先级看springboot源码里面springboot静态资源加载规则我们经常会使用springboo
- 在上篇文章给大家介绍了Spring boot + mybatis + Vue.js + ElementUI 实现数据的增删改查实例代码(一)
- 一、配置xml路径mybatis-plus:mapper-locations: classpath:mapper/*.xml二、编写Mapp
- 文章目录 简介增量构建自定义inputs和outputs运行时API隐式依赖输入校验自定义缓存方法输入归一化其他使用技巧简介在我们使用的各种
- 一、介绍JUnit是一款优秀的开源Java单元测试框架,也是目前使用率最高最流行的测试框架,开发工具Eclipse和IDEA对JUnit都有
- 方法一:<uses-permission android:name="android.permission.WAKE_LOC
- 一、概述一共两个线程,一个线程生产产品,一个线程消费产品,使用同步代码块方法,同步两个线程。当产品没有时,通知生产者生产,生产者生产后,通知
- 方法一class Program { [STAThread] static
- 在android应用中,多屏滑动是一种很常见的风格,没有采用viewpager的代码实现会很长,如果采用ViewPager,代码就会短很多,
- SpringMVC常用组件DispatcherServlet:前端控制器,不需要工程师开发,由框架提供作用:统一处理请求和响应,整个流程控制
- C#中Directory.GetFiles() 函数的使用C#中Directory.GetFiles(string path , strin
- 在这篇文章中,我将向您展示如何用新的Java 8 forEach语句循环一个List和Map。1、forEach 和 Map1.1、常规循环
- 一、直接执行SQL查询:1、mappers文件节选<resultMap id="AcModelResultMap"