如何在Spring Boot中使用MQTT
作者:Houtaroy 发布时间:2023-10-08 20:39:13
目录
为什么选择MQTT
MQTT, 启动!
使用方式
Client模式
创建工厂类
创建工具类
Spring Integration
总结
为什么选择MQTT
MQTT的定义相信很多人都能讲的头头是道,本文章也不讨论什么高大上的东西,旨在用最简单直观的方式让每一位刚接触的同行们可以最快的应用起来
先从使用MQTT需要什么开始分析:
消息服务器
不同应用/设备之间的频繁交互
可能涉及一对多的消息传递
根据上面列举的这三点,我们大概可以了解到, MQTT最适合的场景是消息做为系统的重要组成部分,且参与着系统关键业务逻辑的情形
MQTT, 启动!
既然决定使用它,我们首先要研究的是如何让MQTT正常工作,毕竟它不是简单的在maven里加入个依赖就完事的
我们总共需要干如下两件事:
下载EMQX消息服务器, 作为broker
在maven中引入依赖
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
<version>5.3.2.RELEASE</version>
</dependency>
完成上面两步后, 启动EMQX服务器, 正式进入我们的MQTT旅途
使用方式
在Spring Boot中使用MQTT的代码, 笔者总结了如下两种方式:
使用spring-integration的消息通道概念
使用传统的Client客户端概念
第一种会产生一定程度的心智负担,但在笔者成功搭配(抄袭+造轮子)自动注册后, 比后者要方便许多
在介绍具体代码之前, 我们先简单整理下使用中最常见的概念:
主题: MQTT消息的主要传播途径, 我们向主题发布消息, 订阅主题, 从主题中读取消息并进行业务逻辑处理, 主题是消息的通道
生产者: MQTT消息的发送者, 他们向主题发送消息
消费者: MQTT消息的接收者, 他们订阅自己需要的主题, 并从中获取消息
broker: 消息转发器, 消息是通过它来承载的, EMQX就是我们的broker, 在使用中我们不用关心它的具体实现
其实, MQTT的使用流程就是: 生产者给主题发消息->broker进行消息的传递->订阅该主题的消费者拿到消息并进行相应的业务逻辑
Client模式
本模式和传统的数据库链接,Redis链接基本一致,有开发经验的小伙伴们可以很轻松的驾驭,我们需要考虑的就是如果创建对应的工厂,是单例模式,还是原型,亦或是造个池子呢?
我们使用单例模式来进行本次的介绍
创建工厂类
首先, 我们创造一个工厂(就不承认设计模式中毒)
public class MqttFactory {
private static MqttProperties configuration;
private static MqttClient client;
/**
* 获取客户端实例
* 单例模式, 存在则返回, 不存在则初始化
*/
public static MqttClient getInstance() {
if (client == null) {
init();
}
return client;
}
/**
* 初始化客户端
*/
public static void init() {
try {
client = new MqttClient(configuration.getAddress(), "client-" + System.currentTimeMillis());
// MQTT配置对象
MqttConnectOptions options = new MqttConnectOptions();
// 设置自动重连, 其它具体参数可以查看MqttConnectOptions
options.setAutomaticReconnect(true);
if (!client.isConnected()) {
client.connect(options);
}
} catch (MqttException e) {
LOGGER.error(String.format("MQTT: 连接消息服务器[%s]失败", configuration.getAddress()));
}
}
}
关于MQTT的具体配置可以查看MqttConnectOptions, 在这里就不做说明了
多嘴一句, 文档永远比某些博客给力!!!
创建工具类
接下来, 我们创建MqttUtil, 用于消息的发送以及主题的订阅
public class MqttUtil {
/**
* 发送消息
* @param topic 主题
* @param data 消息内容
*/
public static void send(String topic, Object data) {
// 获取客户端实例
MqttClient client = MqttFactory.getInstance();
ObjectMapper mapper = new ObjectMapper();
try {
// 转换消息为json字符串
String json = mapper.writeValueAsString(data);
client.publish(topic, new MqttMessage(json.getBytes(StandardCharsets.UTF_8)));
} catch (JsonProcessingException e) {
LOGGER.error(String.format("MQTT: 主题[%s]发送消息转换json失败", topic));
} catch (MqttException e) {
LOGGER.error(String.format("MQTT: 主题[%s]发送消息失败", topic));
}
}
/**
* 订阅主题
* @param topic 主题
* @param listener 消息监听处理器
*/
public static void subscribe(String topic, IMqttMessageListener listener) {
MqttClient client = MqttFactory.getInstance();
try {
client.subscribe(topic, listener);
} catch (MqttException e) {
LOGGER.error(String.format("MQTT: 订阅主题[%s]失败", topic));
}
}
}
相信小伙伴们注意到了IMqttMessageListener这个东西, 我们只需要创建一个监听类, 实现IMqttMessageListener接口, 就可以处理消息啦, 代码如下:
public class MessageListener implements IMqttMessageListener {
/**
* 处理消息
* @param topic 主题
* @param mqttMessage 消息
*/
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
LOGGER.info(String.format("MQTT: 订阅主题[%s]发来消息[%s]", topic, new String(mqttMessage.getPayload())));
}
public static void main(String[] args) {
//订阅主题test01, 使用MessageListener来处理它的消息
MqttUtil.subscribe("test01", new MessageListener());
}
}
无论是发送还是订阅,是不是都很好理解?
舒服的事情结束后, 带来的是无尽的折磨和空虚, 来吧, 让我们挑战下心智负担大的第二种模式!
Spring Integration
什么是Spring Integration?对不起,我不知道,我也不想知道
为什么使用Spring Integration?因为它真的很好维护
网上大部分教程都是针对Spring Integration的, 可能是我第一次接触, 千篇一律看的我莫名其妙, 所以我选择放弃了他们, 选择了大神的自动配置方式,并在其基础上,针对心智负担进行了相应的调整
还记得我们之前讨论过的概念吗?主题/生产者/消费者
在Spring Integration中,我们新加入一些概念, 并把之前的进行微调:
通道: 消息传输和接受的管道, 每一条消息都是通过它钻进钻出
客户端工厂: 用于创建MQTT客户端, 和模式一中的类似
消息适配器: 用于接收MQTT消息, 进行转换, 但不参与业务逻辑
入站通道: 搭配消息适配器, 消息进入站台的通道
出站通道: 搭配客户端工厂, 消息发出站台的通道
主题: 还是主题, 它不变
生产者: 拥有出站通道的家伙
消费者: 拥有入站通道的家伙
如果能渐渐理解上面定义的话, 这种模式的流程其实可以变成这样:
生产者: 创建指定客户端工厂的出站通道->发送消息
消费者: 创建指定消息适配器的入站通道->接收消息->进入消息 * ->业务逻辑
其实在笔者看来, 这符合Spring Boot的理念, 约定优于配置
代码已挪入公司 * , 待后续个人 * 配置好后再补充笔记
总结
MQTT作为消息服务, 能够满足我们大部分的开发需求, 但还有一些遗留问题笔者还没进行过深入思考和实践:
如何利用qos机制保证数据不会丢失
消息的队列和排序
集群模式下的应用
来源:https://juejin.cn/post/6953134217547120654
猜你喜欢
- 一、题目描述二、思路语法基础:StringBuilder 类似列表,可以更改元素。package Practice;public class
- 本篇紧接上一篇内容继续,还是从继承里的细节开始1.代码块初始化关于代码块的定义和使用因为之前已经进行过介绍,所以这里就不再赘述,我们所关注的
- 1.这是一个通过Java反射机制解析的工具类2.使用时只需创建对应的对象,并在Excel的第一行填上对应的属性名3.首先要添加相关的jar包
- 年纪大了,以前做过的东西过阵子还是会忘,今天使用jenkins持续集成工具时用到了eclipse上传新maven工程至svn,上传完毕后改了
- 问题springboot 集成springcloud时常常由于版本问题而报错,如下:com.sun.jersey.api.client.Cl
- 反射方式获取JPA Entity属性和值在记录日志或者调试的时候,往往需要输出数据库查询或者写入的值,或者在接 * 互的时候,可能需要将实体转
- 现在很多流行的框架,都可以很快的把分页效果做出来,但是作为一名程序员你必须得知道手写分页的流程:场景效果:一、分页的思路首先我们得知道写分页
- Spring Boot如何实现分布式系统中的服务发现和注册?随着互联网的快速发展,越来越多的企业开始将自己的业务迁移到分布式系统中。在这种情
- 从接触springboot开始,便深深的被它的简洁性深深的折服了,精简的配置,方便的集成,使我再也不想用传统的ssm框架来搭建项目,一大堆的
- 记录一下工作流的在Springboot中的使用,,顺便写个demo,概念,什么东西的我就不解释了,如有问题欢迎各位大佬指导一下。1.创建sp
- 本文实例讲述了Java模拟死锁发生之演绎哲学家进餐问题。分享给大家供大家参考,具体如下:一 点睛常见的死锁形式:当线程1已经占据资源R1,并
- 简介Exchanger是java 5引入的并发类,Exchanger顾名思义就是用来做交换的。这里主要是两个线程之间交换持有的对象。当Exc
- 本文实例讲述了C++语言实现线性表之链表实现方法。分享给大家供大家参考。具体分析如下:插入、删除结点的代码有点多,但这样提高了代码的可读性,
- SpringBoot 集成 activiti 基础环境搭建添加依赖<dependency> <groupId&g
- 目录一、导入依赖二、前端实现三、后台逻辑三、页面效果四、可能会遇到的问题一、导入依赖这里还是用了Apache的POI插件,现在一般的spri
- 本文实例讲述了Java设计模式之 * 模式。分享给大家供大家参考,具体如下:前面介绍了静态代理模式, * 比静态代理模式更加强大。它能在
- 前言最近写了一篇博客是关于 使用Jenkins来构建SVN+Maven项目 ,这里使用的的代码版本工具是SVN,但是事实上也有很多公司使用G
- 上一次接触到编码的知识,还是上大学的时候,那时候学的是通信工程专业,有关编码的内容,不记得是在通信原理还是信息论与编码里面学到的了。却依然记
- 需求:用户和账户一对一关系,查询账户时实现用户的延迟加载思路:根据id查询,需要延迟加载的一方1、用户实体类package com.yl.b
- 概述还没玩过Spring Boot,现在越来越多的公司在用了,不得不学习了。本篇是Spring Boot的开篇,简单介绍一下如何创建一个Sp