Spring boot集成Kafka消息中间件代码实例
作者:墨营 发布时间:2022-11-06 21:53:48
一.创建Spring boot项目,添加如下依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.41</version>
</dependency>
二.配置文件
server.port=4400
#kafka配置
#============== kafka ===================
# 指定kafka 代理地址,可以多个
spring.kafka.bootstrap-servers=192.168.102.88:9092
# 指定默认消费者group id
spring.kafka.consumer.group-id=jkafka.demo
#earliest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
#latest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
#none topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
spring.kafka.consumer.auto-offset-reset=latest
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-commit-interval=100
# 指定消费者消息key和消息体的编解码方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
三.编辑消息实体
@Data
public class Message implements Serializable{
/**
*
*/
private static final long serialVersionUID = 2522280475099635810L;
//消息ID
private String id;
//消息内容
private String msg;
// 消息发送时间
private Date sendTime;
}
四.消息发送类
@Component
public class KfkaProducer {
private static Logger logger = LoggerFactory.getLogger(KfkaProducer.class);
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void send(String topic,Message message) {
try {
logger.info("正在发送消息...");
kafkaTemplate.send(topic,JSON.toJSONString(message));
logger.info("发送消息成功 ----->>>>> message = {}", JSON.toJSONString(message));
} catch (Exception e) {
e.getMessage();
}
}
}
五.发现监听接收类
@Component
public class KfkaListener {
private static Logger logger = LoggerFactory.getLogger(KfkaListener.class);
@KafkaListener(topics = {"hello"})
public void listen(ConsumerRecord<?, ?> record) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
logger.info("接收消息------------ record =" + record);
logger.info("接收消息----------- message =" + message);
}
}
}
六.定时发送信息测试类
@EnableScheduling
@Component
public class PublisherController {
private static final Logger log = LoggerFactory.getLogger(PublisherController.class);
@Autowired
private KfkaProducer kfkaProducer;
@Scheduled(fixedRate = 5000)
public void pubMsg() {
Message msg=new Message();
msg.setId(UUID.randomUUID().toString());
msg.setMsg("发送这条消息给你,你好啊!!!!!!");
msg.setSendTime(new Date());
kfkaProducer.send("hello", msg);;
log.info("Publisher sendes Topic... ");
}
}
七.测试结果
来源:https://blog.51cto.com/13501268/2494869
猜你喜欢
- 本文实例讲述了java GUI编程之paint绘制操作。分享给大家供大家参考,具体如下:import java.awt.*;public c
- Condition的作用是对锁进行更精确的控制。Condition中的await()方法相当于Object的wait()方法,Conditi
- 最近“全网域(Web Scale)”一词被炒得火热,人们也正在通过扩展他们的应用程序架构来使他们的系统变得更加“全网域”。但是究竟什么是全网
- java 闰年判断前言:给定一个年份,判断这一年是不是闰年。当以下情况之一满足时,这一年是闰年:1. 年份是4的倍数而不是100的倍数;2.
- 实现要求1、使用Java图形界面组件设计软件,界面如图所示。2、软件能够满足基本的“加、减、乘、除"等运算要求。3、程序代码清晰,
- IDEA快速搭建spring boot项目1.创建项目老规矩,点击Create New Project2.编写控制器在com.demo.sp
- Strut2判断是否是AJAX调用1. AJAX与传统Form表单实际上,两者一般都是通过HTTP的POST请求。区
- 最近在项目开发中,使用spring boot+mybatis的架构,数据库设计主键id时定义为bigint类型,使用mybatis的自动生成
- 前言先简单介绍下我们的使用场景,线上5台Broker节点的kafka承接了所有binlog订阅的数据,用于Flink组件接收数据做数据中台的
- Android 自动获取验证码的两种方式分别是BroadcastReceiver及ContentObserver,两种方式都需要进行注册、取
- 此方案适用于解决springboot项目运行时动态添加数据源,非静态切换多数据源!!!一、多数据源应用场景:1.配置文件配置多数据源,如默认
- Map是键值对的集合,又叫作字典或关联数组等,是最常见的数据结构之一。在java如何让一个map按value排序呢? 看似简单,但却不容易!
- 详解java 中Spring jsonp 跨域请求的实例jsonp介绍  
- 1. 简单说明嗨,大家好!今天给大家分享的是Mybatis-plus 插件的分页机制,说起分页机制,相信我们程序员都不陌生,今天,我就给大家
- Spring整合mybatis的mapper生成过程mapperScannerConfigurer实现了BeandifinitionRegi
- 本文实例为大家分享了Unity实现场景漫游相机的具体代码,供大家参考,具体内容如下前言拿到场景后总喜欢在场景里面玩一段时间,那这个脚本就是你
- 前言假设项目打包后,项目结构为:此时如果需要再windows环境中进行项目的启动或关闭,需要频繁的手敲命令,很不方便。此时可以编写.bat脚
- 本篇实例内容是关于C#读取CAD文件的,直接看代码//在不使用任务插件的情况下读取DWG文件的缩略图,以便在没有安装AutoCAD的计算机上
- 此解决方案是针对window的,因为日志默认保存路径在C盘,linux忽略。学习RocketMQ过程中,总是出现com.alibaba.ro
- Java NIO(New IO)是Java 1.4版本中引入的一套全新的IO处理机制,与之前的传统IO相比,NIO具有更高的可扩展性和灵活性