spring-cloud-stream结合kafka使用详解
作者:KyleYaoKeepGoing 发布时间:2022-05-19 14:32:50
标签:spring,cloud,stream,kafka
1.pom文件导入依赖
<!-- kafka -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
2.application.yml文件配置
spring:
cloud:
stream:
kafka:
binder:
brokers: xxx.xxx.xxx.xx:xxxx // Kafka的消息中间件服务器地址
bindings:
xxx_output: // 通道名称
destination: xxx // 消息发往的目的地,对应topic 在发送消息的配置里面,group是不用配置的
// 如果我们需要传输json的信息,那么在发送消息端需要设置content-type为json(其实可以不写,默认content-type就是json)
xxx_input:
destination: xxx // 消息发往的目的地,对应topic
group: xxx // 对应kafka的group
3.创建消息发送者
@EnableBinding(Source.class) // @EnableBinding 是绑定通道的,Soure.class是spring 提供的,表示这是一个可绑定的发布通道
@Service
public class MqService {
@Resource(name = KafkaConstants.OES_WORKBENCH_LIFE_DATA_OUTPUT)
private MessageChannel oesWorkbenchChannel;
/**
* 发送一条kafka消息
*/
public boolean sendLifeData(Object object) {
return MqUtils.send(oesWorkbenchChannel, object, KafkaConstants.OES_WORKBENCH_LIFE_DATA_OUTPUT);
}
}
// 发布通道
public interface Source {
@Output(KafkaConstants.OES_WORKBENCH_LIFE_DATA_OUTPUT)
MessageChannel oesWorkbenchLifeDataOutput(); // 发布通道用MessageChannel
}
4.创建消息监听者
@Slf4j
@EnableBinding(Sink.class)
public class WorkbenchStreamListener {
@Resource
private FileService fileService;
@StreamListener(KafkaConstants.xxx_input) // 监听接受通道
public void receiveData(MoveMessage moveMessage) {
}
}
// 接受通道
public interface Sink {
@Input(KafkaConstants.OES_WORKBENCH_MOVE_INPUT)
SubscribableChannel oesWorkbenchMoveInput(); // 接受通道用SubscribableChannel
}
接下来就可以愉快的发送监听消息了
来源:https://blog.csdn.net/oWanShiKaiTouNan/article/details/108056417


猜你喜欢
- 在2020.1.1版本之前IDEA pom文件导包是这样的最近新装新版本IDEA之后,这个图标没有了,对于习惯旧操作没有图标了还真不习惯。就
- public List<string> GetXYFromPic(String jpgPath) {
- Android:AIDL和远程Service调用本讲的内容,理解起来很难,也许你看了很多资料也看不明白,但是用起来缺简单的要命。所以我们干脆
- SpringBoot的具体介绍可以参看其他网上介绍,这里就不多说了,就这几天的学习,个人理解,简而言之: (1)它是Spring的
- Idea运行单个main方法,不编译整个工程直接上图1、选择main方法类右键->create ‘类名.main&
- Service层:public int addUser(UserDomian user){ int i = userMapper
- 本文是针对AndBase框架学习整理的第三篇笔记,想要了解AndBase框架的朋友可以阅读本文,大家共同学习。学习内容:1.使用AndBas
- Java本身都是值传递式的调用,对于对象传递的是地址值。给地址值重新赋值等于重新指向,不会影响外层。而且这里Integer对象也有特殊性。其
- 反射实例化类public class Person{ public string Name { get; set; }publi
- C#实现的鼠标钩子,可以获取鼠标在屏幕中的坐标,记得要以管理员权限运行才行using System;using System.Collect
- Java中Stop-The-World机制简称STW,是在执行垃圾收集算法时,Java应用程序的其他所有线程都被挂起(除了垃圾收集帮助器之外
- 引言第一眼看到这个题目,我相信大家都会脑子里面弹出来一个想法:这不都是 Spring 的注解么,加了这两个注解的类都会被最终封装成 Bean
- 时间格式化在项目中使用频率是非常高的,当我们的 API 接口返回结果,需要对其中某一个 date 字段属性进行特殊的格式化处理,通常会用到
- 前言我采用的是Camera来实现自定义相机的,如果你使用的是Camera2,那本文将不适用你。为了减少篇幅,本文主要讲解手动对焦的实现方式,
- 公司的svn的地址改变了,怎么办呢。自己本地的正在修改的项目怎么办呢?修改一下svn的服务器地址咯。1.就是先关闭ide,重新打开,然后选择
- 1、修改maven的pom文件只需要将如下依赖添加到pom.xml文件中即可。(注意此处是以plugin的方式,放在<plugins&
- 新手学习记录。写在springboot test 示例 示例代码地址看结尾。后面有带页面的示例。SpringBoot Test无
- 线程同步的概念由于同一个进程的多个线程共享同一块存储空间,在带来方便的同时,也会带来访问冲突的问题:举例:public class Runn
- 删除字符串的所有标点str = str.replaceAll("[\\pP‘'“”]", ""
- 对 Excel 进行读写操作是生产环境下常见的业务,网上搜索的实现方式都是基于POI和JXL第三方框架,但都不是很全面。小编由于这两天刚好需