spring-cloud-stream结合kafka使用详解
作者:KyleYaoKeepGoing 发布时间:2022-05-19 14:32:50
标签:spring,cloud,stream,kafka
1.pom文件导入依赖
<!-- kafka -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
2.application.yml文件配置
spring:
cloud:
stream:
kafka:
binder:
brokers: xxx.xxx.xxx.xx:xxxx // Kafka的消息中间件服务器地址
bindings:
xxx_output: // 通道名称
destination: xxx // 消息发往的目的地,对应topic 在发送消息的配置里面,group是不用配置的
// 如果我们需要传输json的信息,那么在发送消息端需要设置content-type为json(其实可以不写,默认content-type就是json)
xxx_input:
destination: xxx // 消息发往的目的地,对应topic
group: xxx // 对应kafka的group
3.创建消息发送者
@EnableBinding(Source.class) // @EnableBinding 是绑定通道的,Soure.class是spring 提供的,表示这是一个可绑定的发布通道
@Service
public class MqService {
@Resource(name = KafkaConstants.OES_WORKBENCH_LIFE_DATA_OUTPUT)
private MessageChannel oesWorkbenchChannel;
/**
* 发送一条kafka消息
*/
public boolean sendLifeData(Object object) {
return MqUtils.send(oesWorkbenchChannel, object, KafkaConstants.OES_WORKBENCH_LIFE_DATA_OUTPUT);
}
}
// 发布通道
public interface Source {
@Output(KafkaConstants.OES_WORKBENCH_LIFE_DATA_OUTPUT)
MessageChannel oesWorkbenchLifeDataOutput(); // 发布通道用MessageChannel
}
4.创建消息监听者
@Slf4j
@EnableBinding(Sink.class)
public class WorkbenchStreamListener {
@Resource
private FileService fileService;
@StreamListener(KafkaConstants.xxx_input) // 监听接受通道
public void receiveData(MoveMessage moveMessage) {
}
}
// 接受通道
public interface Sink {
@Input(KafkaConstants.OES_WORKBENCH_MOVE_INPUT)
SubscribableChannel oesWorkbenchMoveInput(); // 接受通道用SubscribableChannel
}
接下来就可以愉快的发送监听消息了
来源:https://blog.csdn.net/oWanShiKaiTouNan/article/details/108056417
0
投稿
猜你喜欢
- B/S 系统中对http 请求数据的校验多数在客户端进行,这也是出于简单及用户体验性上考虑,但是在一些安全性要求高的系统中服务端校验是不可缺
- namespace PadWebServices.Model{ public static class DataTa
- 前言人类建造迷宫已有5000年的历史。在世界的不同文化发展时期,这些奇特的建筑物始终吸引人们沿着弯弯曲曲、困难重重的小路吃力地行走,寻找真相
- Spring5路径匹配器PathPatternPathPattern 对url地址匹配的处理更加快速,它和AntPathMatcher 主要
- ArrayList实现班级信息管理系统,供大家参考,具体内容如下代码如下:import java.util.*;public class D
- 文件上传是开发中十分常见的功能,在servlet3.0之前,实现文件上传需要使用一些插件技术,比如:commons-fileuploadsm
- 本文实例为大家分享了WPF实现文字粒子闪烁动画的具体代码,供大家参考,具体内容如下实现效果如下:思路:首先根据显示文本创建文本路径Geome
- 前言xxljob 是采用 java 开发的开源的任务调度系统,架构上分为调度管理器、执行器,目前除了官方提供的 java 执行器外,也有 g
- 常量是在编译时已知并在程序的生存期内不发生更改的不可变值。常量使用 const 修饰符进行声明。只有 C# 内置类型(System.Obje
- 目录概要独立文件专属文件internal storageexternal storage概要当我们查看手机的文件管理器的时候,会发现里面的文
- 对一个集合中的对象进行排序,根据对象的某个指标的大小进行升序或降序排序。代码如下:进行降序排列 进行降序排列 Co
- 一、目的针对不同地区,设置不同的语言信息。SpringBoot国际化配置文件默认放在classpath:message.properties
- 在Web开发过程中离不开数据的交互,这就需要规定交互数据的相关格式,以便数据在客户端与服务器之间进行传递。数据的格式通常有2种:1、xml;
- MyBatis的注解实现复杂映射开发实现复杂关系映射之前我们可以在映射文件中通过配置来实现,使用注解开发后,我们可以使用@Results注解
- 环境搭建项目结构图:1.我们首先做好服务端pom.xml<dependencies>
- 前文传送门:Netty分布式高性能工具类同线程下回收对象解析异线程回收对象就是创建对象和回收对象不在同一条线程的情况下, 对象回收的逻辑我们
- 本文实例讲述了Java矩阵连乘问题(动态规划)算法。分享给大家供大家参考,具体如下:问题描述:给定n个矩阵:A1,A2,...,An,其中A
- 一、Unity Shader基础1、创建和使用Shader在Unity中Shader一般由两种用途:指定给材质,用于物理渲染;指定给脚本,用
- Java CharArrayReader流一、CharArrayReader流定义API说明:该类实现了一个可用作字符输入流的字符缓冲区,即
- Docker现在很火,容器技术看上不无所不能,但这实际上是一种误解,不要被炒作出来的泡沫迷住双眼,本文抛去炒作,理性地从Java程序员的角度