在springboot中对kafka进行读写的示例代码
作者:冬天里的懒喵 发布时间:2023-11-26 00:48:02
标签:springboot,kafka
springboot对kafka的client很好的实现了集成,使用非常方便,本文也实现了一个在springboot中实现操作kafka的demo。
1.POM配置
只需要在dependencies中增加 spring-kafka的配置即可。完整效果如下:
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.4.RELEASE</version>
</parent>
<properties>
<java.version>1.8</java.version>
<spring-kafka.version>1.2.2.RELEASE</spring-kafka.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
<!-- spring-kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>${spring-kafka.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<version>${spring-kafka.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
2.生产者
参数配置类,其参数卸载yml文件中,通过@Value注入
package com.dhb.kafka.producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class SenderConfig {
@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String,Object> producerConfigs() {
Map<String,Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,this.bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
props.put(ProducerConfig.ACKS_CONFIG,"0");
return props;
}
@Bean
public ProducerFactory<String,String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String,String> kafkaTemplate() {
return new KafkaTemplate<String, String>(producerFactory());
}
@Bean
public Sender sender() {
return new Sender();
}
}
消息发送类
package com.dhb.kafka.producer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
@Slf4j
public class Sender {
@Autowired
private KafkaTemplate<String,String> kafkaTemplate;
public void send(String topic,String payload) {
log.info("sending payload='{}' to topic='{}'",payload,topic);
this.kafkaTemplate.send(topic,payload);
}
}
3.消费者
参数配置类
package com.dhb.kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import java.util.HashMap;
import java.util.Map;
@Configuration
@EnableKafka
public class ReceiverConfig {
@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;
public Map<String,Object> consumerConfigs() {
Map<String,Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG,"helloword");
return props;
}
@Bean
public ConsumerFactory<String,String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String,String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String,String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public Receiver receiver() {
return new Receiver();
}
}
消息接受类
package com.dhb.kafka.consumer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import java.util.concurrent.CountDownLatch;
@Slf4j
public class Receiver {
private CountDownLatch latch = new CountDownLatch(1);
public CountDownLatch getLatch() {
return latch;
}
@KafkaListener(topics = "${kafka.topic.helloworld}")
public void receive(String payload) {
log.info("received payload='{}'",payload);
latch.countDown();
}
}
3.web测试类
定义了一个基于http的web测试接口
package com.dhb.kafka.web;
import com.dhb.kafka.producer.Sender;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
@RestController
@Slf4j
public class KafkaProducer {
@Autowired
Sender sender;
@RequestMapping(value = "/sender.action", method = RequestMethod.POST)
public void exec(HttpServletRequest request, HttpServletResponse response,String data) throws IOException{
this.sender.send("testtopic",data);
response.setCharacterEncoding("UTF-8");
response.setContentType("text/json");
response.getWriter().write("success");
response.getWriter().flush();
response.getWriter().close();
}
}
4.启动类及配置
package com.dhb.kafka;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class KafkaApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaApplication.class,args);
}
}
application.yml
kafka:
bootstrap-servers: 192.168.162.239:9092
topic:
helloworld: testtopic
程序结构:
包结构
5.读写测试
通过执行KafkaApplication的main方法启动程序。然后打开postman进行测试:
运行后返回success
生产者日志:
消费者日志:
来源:http://www.jianshu.com/p/3dcb64e49ac5?utm_source=tuicool&utm_medium=referral


猜你喜欢
- C#中的很多关键词用法比较容易混淆,var和dynamic就是其中一组,他们都可以申明动态类型的变量,但是本质上他们还是有不少区别的。var
- 在jdk1.4中提出的技术,非阻塞IO,采用的是基于事件处理方式。传统的io技术为阻塞的,比如读一个文件,惹read方法是阻塞的,直到有数据
- 用Linq从一个集合选取几列得到一个新的集合-可改列名
- C# 自带的HttpWebRequest效率太低,对于自组HTTP封包不好操作。在写超级SQL注入工具时,研究了很长一段时间如何使用Sock
- java jpa自定义sql语句本篇只是为了再次记录自己又学习了jpa的使用,框架原生的通过解析方法名多适用于单表操作,自定义的sql查询则
- Android 图片选择可以达到的效果:1.第一个图片的位置放照相机,点击打开照相机2.其余的是显示全部存储的图片,点击一次是查
- 序言在flutter开发中,我们使用 bloc 框架,基于状态变更进行响应式开发。本篇文章,小轰将 bloc 核心业务块进行拆解简化,聊一聊
- 编写程序,实现顺序表的下列功能:从键盘输入数据建立一个顺序表输出该顺序表往顺序表中插入数据从顺序表中删除数据给定数据,进行查找,给出查找成功
- 我们平时在开发系统时,一般我们的系统工程会被分为多个模块,一个原因是方便协同开发,系统间解耦,另外一个很重要的原因是:别的系统需要依赖我们系
- 作者: juky_huang 事件的简单解释: 事件是对象发送的消息,以发信号通知操作的发生。操作可能是由用户交互(例如
- Spring实例Bean的方法Spring实例Bean的方法,在AbstractAutowireCapableBeanFactory中的pr
- 前面一篇文章已经写了如何搭建一个单机版Redis服务, 那么我们应该怎么在现有的系统中集成进来呢? 由于笔者使用的编程语言是Java, 所以
- 获取e.printStackTrace()打印的信息某些情况下,我们需要获取应用打印的异常信息,这时就可以借助StringWriter和Pr
- 前言因为最近的项目需要使用录音功能,开始的想法是Button+OnTouchListener+Dialog实现,在大部分手机中都没问题,只有
- 1. 确保你项目能编译通过,安装java jdk 环境填写环境变量2. 添加SpringBootServletInitializer的子类重
- 本文介绍了eclipse下搭建hibernate5.0环境的步骤,分享给大家,具体如下:hibernate引入的jar包:hibernate
- 前言这几天看《Java并发编程之美》的时候又遇到了ThradLocal这个类,不得不说,这个类在平时很多场景都遇得到,所以对其进行一个系统性
- Class.forName(xxx.xx.xx) 返回的是一个类一.首先你要明白在java里面任何class都要装载在虚拟机上才能运行。1.
- 最近项目中的活动面板要做来回滚动卡牌预览效果,感觉自己来写的话,也能写,但是可能会比较耗时,看到Github上有开源的项目,于是就借用了,G
- 前言:在Java项目中,有两个主要的构建系统:Gradle和Maven。构建系统主要管理潜在的复杂依赖关系并正确编译项目。还可以将已编译的项