springboot项目配置多个kafka的示例代码
作者:CccccDi 发布时间:2023-11-23 23:15:29
标签:springboot,kafka
1.spring-kafka
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>1.3.5.RELEASE</version>
</dependency>
2.配置文件相关信息
kafka.bootstrap-servers=localhost:9092
kafka.consumer.group.id=20230321
#可以并发消费的线程数 (通常与partition数量一致)
kafka.consumer.concurrency=10
kafka.consumer.enable.auto.commit=false
kafka.bootstrap-servers.pic=localhost:29092
kafka.consumer.group.id.pic=20230322_pic
kafka.consumer.concurrency.pic=10
kafka.consumer.enable.auto.commit.pic=false
3.kafka配置类
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Value("${kafka.consumer.group.id}")
private String groupId;
@Value("${kafka.consumer.concurrency}")
private int concurrency;
@Value("${kafka.consumer.enable.auto.commit}")
private String autoCommit;
@Value("${kafka.bootstrap-servers}")
private String bootstrapServer;
@Value("${kafka.consumer.group.id.pic}")
private String groupIdPic;
@Value("${kafka.consumer.concurrency.pic}")
private int concurrencyPic;
@Value("${kafka.consumer.enable.auto.commit.pic}")
private String autoCommitPic;
@Value("${kafka.bootstrap-servers.pic}")
private String bootstrapServerPic;
@Bean
public ConsumerFactory<String, String> consumerFactory() {
String bootstrapServers = bootstrapServer;
Map<String, Object> configProps = new HashMap<>(16);
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
return new DefaultKafkaConsumerFactory<>(configProps);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(concurrency);
factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
return factory;
}
@Bean
public ConsumerFactory<String, String> consumerFactoryPic() {
String bootstrapServers = bootstrapServerPic;
Map<String, Object> configProps = new HashMap<>(16);
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupIdPic);
configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommitPic);
return new DefaultKafkaConsumerFactory<>(configProps);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactoryPic() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactoryPic());
factory.setConcurrency(concurrencyPic);
factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
return factory;
}
}
4.消费主题消息
@KafkaListener(topics = "xxxxx", containerFactory = "kafkaListenerContainerFactoryPic")
public void receive(ConsumerRecord<String, String> message, Acknowledgment ack) {
try {
String jsonString = message.value();
if (StringUtils.isNoneBlank(jsonString)) {
log.info("消费:{}",jsonString);
//TODO ....
}
} catch (Exception e) {
log.error(" receive topic error ", e);
} finally {
ack.acknowledge();
}
}
@KafkaListener(topics = "xxxxxx", containerFactory = "kafkaListenerContainerFactory")
public void receive(ConsumerRecord<String, String> message, Acknowledgment ack) {
try {
if (StringUtils.isNoneBlank(message.value())) {
//TODO ....
}
} catch (Exception e) {
logger.error(" receive topic error ", e);
} finally {
ack.acknowledge();
}
}
来源:https://www.cnblogs.com/128-cdy/p/17293059.html


猜你喜欢
- Android UI 实现 * 详解listview 的使用步骤简单的listview * 实现1.实现效果图2.需要掌握的知识listvi
- 本文实例为大家分享了Android实现简易计算功能的具体代码,供大家参考,具体内容如下效果如图:activity_main.xml<?
- Java 反射机制介绍Java 反射机制。通俗来讲呢,就是在运行状态中,我们可以根据“类的部分已经的信息”来还原“类的全部的信息”。这里“类
- 基于 springboot+vue 的测试平台(练手项目)开发继续更新。接下来准备开发请求断言功能。关于这个功能要实现哪些需求,长什么样子,
- Java实现简单的类似QQ聊天工具,供大家参考,具体内容如下所使用到的知识点:java socket编程之TCP协议java Swing简单
- 问题描述:eclipse启动tomcat时,不能加载web项目,但是把war包丢进tomcat的webapps下是能正常运行的解决方式:右键
- 前言笔者上次用C#写.Net代码差不多还是10多年以前,由于当时Java已经颇具王者风范,Net几乎被打得溃不成军。因此当时笔者对于这个.N
- 最近有很多小伙伴给我留言,分布式系统时代,线程并发,资源抢占,"锁" 慢慢变得很重要。那么常见的锁都有哪些?今天Tom哥
- 在C#中,当引用类型需要转换的时候,经常会用到关键字is、as以及显式强转。本篇来体验这三者的用法。先来梳理.NET引用类型转换的"
- select 相当于 for 循环select id from IDArrayLinkedList a = new LinkedList()
- 本文实例为大家分享了java实现省市区转换成树形结构的具体代码,供大家参考,具体内容如下前言:为什我想写这篇博客呢?第一方面是记录,另一方面
- 本文实例讲述了java实现的n*n矩阵求值及求逆矩阵算法。分享给大家供大家参考,具体如下:先来看看运行结果:java版的写出来了,用的跟c语
- 以前只知道控件的onTouchEvent()事件,它的动作有MotionEvent.ACTION_DOWN、MotionEvent.ACTI
- 前言Spring 事务注解 @Transactional 本来可以保证原子性,如果事务内有报错的话,整个事务可以保证回滚,但是加上try c
- 本文实例为大家分享了Android使用Gridview单行横向滚动显示的具体代码,供大家参考,具体内容如下要想实现滚动显示,layout布局
- springBoot所有依赖和配置文件都写好的情况下1、dao接口的实现方法package com.cy.pj.sys.dao;import
- 一般在web应用中,对客户端提交上来的图片肯定需要进行压缩的。尤其是比较大的图片,如果不经过压缩会导致页面变的很大,打开速度比较慢,当然了如
- SpringBoot线程池和Java线程池的用法和实现原理使用默认的线程池方式一:通过@Async注解调用public class Asyn
- boolean isGBK(String s) throws UnsupportedEncodingException { if(s.equ
- 一、数据类型与变量的介绍在程序运行的过程中计算机需要记录大量的状态 数据(这里我们统称数据)。那这些数据都存放在哪呢?程序在运行过程中的数据