软件编程
位置:首页>> 软件编程>> java编程>> Spring boot集成Kafka消息中间件代码实例

Spring boot集成Kafka消息中间件代码实例

作者:墨营  发布时间:2022-11-06 21:53:48 

标签:Spring,boot,Kafka,消息,中间件

一.创建Spring boot项目,添加如下依赖


<dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-web</artifactId>
   </dependency>
   <dependency>
     <groupId>org.projectlombok</groupId>
     <artifactId>lombok</artifactId>
     <optional>true</optional>
   </dependency>
   <!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
   <dependency>
     <groupId>org.springframework.kafka</groupId>
     <artifactId>spring-kafka</artifactId>
   </dependency>
   <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
   <dependency>
     <groupId>org.apache.kafka</groupId>
     <artifactId>kafka-clients</artifactId>
   </dependency>
   <dependency>
     <groupId>com.alibaba</groupId>
     <artifactId>fastjson</artifactId>
     <version>1.2.41</version>
   </dependency>

二.配置文件

server.port=4400

#kafka配置
#============== kafka ===================
# 指定kafka 代理地址,可以多个
spring.kafka.bootstrap-servers=192.168.102.88:9092
# 指定默认消费者group id
spring.kafka.consumer.group-id=jkafka.demo
#earliest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
#latest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
#none topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
spring.kafka.consumer.auto-offset-reset=latest
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-commit-interval=100
# 指定消费者消息key和消息体的编解码方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

三.编辑消息实体


@Data
public class Message implements Serializable{

/**
  *
  */
 private static final long serialVersionUID = 2522280475099635810L;

//消息ID
 private String id;

//消息内容
 private String msg;

// 消息发送时间
 private Date sendTime;

}

四.消息发送类


@Component
public class KfkaProducer {

private static Logger logger = LoggerFactory.getLogger(KfkaProducer.class);

@Autowired
 private KafkaTemplate<String, String> kafkaTemplate;

public void send(String topic,Message message) {
   try {
     logger.info("正在发送消息...");
     kafkaTemplate.send(topic,JSON.toJSONString(message));
     logger.info("发送消息成功 ----->>>>> message = {}", JSON.toJSONString(message));
   } catch (Exception e) {
     e.getMessage();
   }

}
}

五.发现监听接收类


@Component
public class KfkaListener {

private static Logger logger = LoggerFactory.getLogger(KfkaListener.class);

@KafkaListener(topics = {"hello"})
 public void listen(ConsumerRecord<?, ?> record) {
   Optional<?> kafkaMessage = Optional.ofNullable(record.value());
   if (kafkaMessage.isPresent()) {
     Object message = kafkaMessage.get();
     logger.info("接收消息------------ record =" + record);
     logger.info("接收消息----------- message =" + message);
   }
 }
}

六.定时发送信息测试类


@EnableScheduling
@Component
public class PublisherController {

private static final Logger log = LoggerFactory.getLogger(PublisherController.class);

@Autowired
 private KfkaProducer kfkaProducer;

@Scheduled(fixedRate = 5000)
 public void pubMsg() {
   Message msg=new Message();
   msg.setId(UUID.randomUUID().toString());
   msg.setMsg("发送这条消息给你,你好啊!!!!!!");
   msg.setSendTime(new Date());
   kfkaProducer.send("hello", msg);;
   log.info("Publisher sendes Topic... ");
 }
}

七.测试结果

Spring boot集成Kafka消息中间件代码实例

来源:https://blog.51cto.com/13501268/2494869

0
投稿

猜你喜欢

手机版 软件编程 asp之家 www.aspxhome.com