软件编程
位置:首页>> 软件编程>> java编程>> spring-cloud-stream结合kafka使用详解

spring-cloud-stream结合kafka使用详解

作者:KyleYaoKeepGoing  发布时间:2022-05-19 14:32:50 

标签:spring,cloud,stream,kafka

1.pom文件导入依赖


<!-- kafka -->
<dependency>
 <groupId>org.springframework.cloud</groupId>
 <artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>

2.application.yml文件配置


spring:
cloud:
 stream:
  kafka:
   binder:
    brokers: xxx.xxx.xxx.xx:xxxx // Kafka的消息中间件服务器地址
  bindings:
   xxx_output: // 通道名称
    destination: xxx // 消息发往的目的地,对应topic 在发送消息的配置里面,group是不用配置的
    // 如果我们需要传输json的信息,那么在发送消息端需要设置content-type为json(其实可以不写,默认content-type就是json)
   xxx_input:
    destination: xxx // 消息发往的目的地,对应topic
    group: xxx // 对应kafka的group

3.创建消息发送者


@EnableBinding(Source.class) // @EnableBinding 是绑定通道的,Soure.class是spring 提供的,表示这是一个可绑定的发布通道
@Service
public class MqService {

@Resource(name = KafkaConstants.OES_WORKBENCH_LIFE_DATA_OUTPUT)
 private MessageChannel oesWorkbenchChannel;

/**
  * 发送一条kafka消息
  */
 public boolean sendLifeData(Object object) {
   return MqUtils.send(oesWorkbenchChannel, object, KafkaConstants.OES_WORKBENCH_LIFE_DATA_OUTPUT);
 }
}

// 发布通道
public interface Source {
 @Output(KafkaConstants.OES_WORKBENCH_LIFE_DATA_OUTPUT)
 MessageChannel oesWorkbenchLifeDataOutput(); // 发布通道用MessageChannel
}

4.创建消息监听者


@Slf4j
@EnableBinding(Sink.class)
public class WorkbenchStreamListener {

@Resource
 private FileService fileService;

@StreamListener(KafkaConstants.xxx_input) // 监听接受通道
 public void receiveData(MoveMessage moveMessage) {
 }
}

// 接受通道
public interface Sink {
 @Input(KafkaConstants.OES_WORKBENCH_MOVE_INPUT)
 SubscribableChannel oesWorkbenchMoveInput(); // 接受通道用SubscribableChannel
}

接下来就可以愉快的发送监听消息了

来源:https://blog.csdn.net/oWanShiKaiTouNan/article/details/108056417

0
投稿

猜你喜欢

手机版 软件编程 asp之家 www.aspxhome.com