Spring boot集成Kafka消息中间件代码实例
作者:墨营 发布时间:2022-11-06 21:53:48
一.创建Spring boot项目,添加如下依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.41</version>
</dependency>
二.配置文件
server.port=4400
#kafka配置
#============== kafka ===================
# 指定kafka 代理地址,可以多个
spring.kafka.bootstrap-servers=192.168.102.88:9092
# 指定默认消费者group id
spring.kafka.consumer.group-id=jkafka.demo
#earliest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
#latest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
#none topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
spring.kafka.consumer.auto-offset-reset=latest
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-commit-interval=100
# 指定消费者消息key和消息体的编解码方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
三.编辑消息实体
@Data
public class Message implements Serializable{
/**
*
*/
private static final long serialVersionUID = 2522280475099635810L;
//消息ID
private String id;
//消息内容
private String msg;
// 消息发送时间
private Date sendTime;
}
四.消息发送类
@Component
public class KfkaProducer {
private static Logger logger = LoggerFactory.getLogger(KfkaProducer.class);
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void send(String topic,Message message) {
try {
logger.info("正在发送消息...");
kafkaTemplate.send(topic,JSON.toJSONString(message));
logger.info("发送消息成功 ----->>>>> message = {}", JSON.toJSONString(message));
} catch (Exception e) {
e.getMessage();
}
}
}
五.发现监听接收类
@Component
public class KfkaListener {
private static Logger logger = LoggerFactory.getLogger(KfkaListener.class);
@KafkaListener(topics = {"hello"})
public void listen(ConsumerRecord<?, ?> record) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
logger.info("接收消息------------ record =" + record);
logger.info("接收消息----------- message =" + message);
}
}
}
六.定时发送信息测试类
@EnableScheduling
@Component
public class PublisherController {
private static final Logger log = LoggerFactory.getLogger(PublisherController.class);
@Autowired
private KfkaProducer kfkaProducer;
@Scheduled(fixedRate = 5000)
public void pubMsg() {
Message msg=new Message();
msg.setId(UUID.randomUUID().toString());
msg.setMsg("发送这条消息给你,你好啊!!!!!!");
msg.setSendTime(new Date());
kfkaProducer.send("hello", msg);;
log.info("Publisher sendes Topic... ");
}
}
七.测试结果
来源:https://blog.51cto.com/13501268/2494869


猜你喜欢
- 今天启动springboot项目时失败了解决检查原因发现是启动类的MapperScan("")的值写到类名了,改成类所在
- 本文介绍了Maven+Tomcat8 实现自动化部署的方法,分享给大家,具体如下:1.配置tomcat-users.xml首先在Tomcat
- 本文通过两个方法:(1)计算总的页数。 (2)查询指定页数据,实现简单的分页效果。思路:首先得在 DAO 对象中提供分页查询的方法,在控制层
- ViewStub可以在运行时动态的添加布局。帮助文档给定的定义是:"A ViewStub is an invisible, zer
- 迭代器模式,一直没用过,也不会用。恰巧MyBatis框架中也使用到了迭代器模式,而且看起来还比较简单,在以后的工作中,若有需要咱们可模仿它的
- 前言在C/S这种模式中,自动更新程序就显得尤为重要,它不像B/S模式,直接发布到服务器上,浏览器点个刷新就可以了。由于涉及到客户端文件,所以
- 本文实例讲述了Java基于socket实现的客户端和服务端通信功能。分享给大家供大家参考,具体如下:以下代码参考马士兵的聊天项目,先运行Ch
- 前面的文章介绍了如何进行权限控制,即访问控制器或者方法的时候,要求当前用户必须具备特定的权限,但是如何在程序中进行权限的分配呢?下面就介绍下
- 前言map的迭代删除,和我们常见的list,set不太一样,不能直接获取Iteraotr对象,提供的删除方法也是单个的,根据key进行删除,
- 文章主要涉及到以下几个问题:怎么实现Java的序列化为什么实现了java.io.Serializable接口才能被序列化transient的
- 本文实例为大家分享了java库存管理系统的具体代码,供大家参考,具体内容如下模拟真实的库存管理逻辑,完成超市管理系统的日常功能实现。经过分析
- **前言:**我们项目中可能有这种需求,每个人请求了哪些接口?做了什么事情?参数是什么?重要的接口我们需要记录操作日志以便查找。操作日志和系
- 问题描述如果直接获取某个json数组中的元素将得到如下的json{ "44": { "height
- 上一篇中我们介绍了自定义实现BaseAdapter的普通实现布局,然而上一章也说了普通实现的方式效率会很低,而且对系统开销也很大,所以,那样
- 本文实例为大家分享了Qt实现计算器功能的具体代码,供大家参考,具体内容如下该计算器主要通过lineEdit获取和显示数字,通过tablevi
- 文章描述这个程序也记不清是什么时候写的了,犹记得那时我还很年轻,偶然从网上看到了这样一个类似的标题(AI五子棋的实现),进去后看到那个是ja
- Remote Debug 综述当我们的后台项目部署到服务器上时,由于环境和本地不同,有时候也会有一些奇奇怪怪的问题出现。只依赖服务器上的日志
- 要获得打印机的状态,应该定义一个联合.enum PrinterStatus { 其他状态= 1, 未知, 空闲
- springboot 长轮询实现基于 @EnableAsync , @Sync@SpringBootApplication@EnableAs
- Kotlin中函数都是头等的,这意味着它可以存储在变量与数据结构中、作为参数传递给其他高阶函数以及从其他高阶函数返回。可以向操作任何其他非函