Springboot整合mqtt服务的示例代码
作者:李泰山 发布时间:2022-07-20 02:58:01
标签:Springboot,mqtt
首先在pom文件里引入mqtt的依赖配置
<!--mqtt-->
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.4</version>
</dependency>
其次在springboot 的配置yml文件,配置mqtt的服务配置
spring:
mqtt:
url: tcp://127.0.0.1:1883
client-id: niubility-tiger
username:
password:
topic: [/unify/test]
创建 MqttProperties配置参数类
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
@Data
@ConfigurationProperties("spring.mqtt")
public class MqttProperties {
private String url;
private String clientId;
private String username;
private String password;
private String[] topic;
}
创建 MqttConfiguration 配置类
import org.eclipse.paho.client.mqttv3.IMqttClient;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springblade.core.tool.utils.Func;
import org.springblade.ubw.listener.MqttSubscribeListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@EnableConfigurationProperties({MqttProperties.class})
public class MqttConfiguration {
private static final Logger log = LoggerFactory.getLogger(MqttConfiguration.class);
@Autowired
private MqttProperties mqttProperties;
public MqttConfiguration() {
}
@Bean
public MqttConnectOptions mqttConnectOptions() {
MqttConnectOptions connectOptions = new MqttConnectOptions();
connectOptions.setServerURIs(new String[]{this.mqttProperties.getUrl()});
if (Func.isNotBlank(this.mqttProperties.getUrl())) {
connectOptions.setUserName(this.mqttProperties.getUsername());
}
if (Func.isNotBlank(this.mqttProperties.getPassword())) {
connectOptions.setPassword(this.mqttProperties.getPassword().toCharArray());
}
connectOptions.setKeepAliveInterval(60);
return connectOptions;
}
@Bean
public IMqttClient mqttClient(MqttConnectOptions options) throws MqttException {
IMqttClient mqttClient = new MqttClient(this.mqttProperties.getUrl(), this.mqttProperties.getClientId());
mqttClient.connect(options);
for(int i = 0; i< this.mqttProperties.getTopic().length; ++i) {
mqttClient.subscribe(this.mqttProperties.getTopic()[i], new MqttSubscribeListener());
}
return mqttClient;
}
}
创建 订阅事件类
import org.springframework.context.ApplicationEvent;
public class UWBMqttSubscribeEvent extends ApplicationEvent {
private String topic;
public UWBMqttSubscribeEvent(String topic, Object source) {
super(source);
this.topic = topic;
}
public String getTopic() {
return this.topic;
}
}
创建订阅事件 *
import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springblade.core.tool.utils.SpringUtil;
import org.springblade.ubw.event.UWBMqttSubscribeEvent;
public class MqttSubscribeListener implements IMqttMessageListener {
@Override
public void messageArrived(String s, MqttMessage mqttMessage) {
String content = new String(mqttMessage.getPayload());
UWBMqttSubscribeEvent event = new UWBMqttSubscribeEvent(s, content);
SpringUtil.publishEvent(event);
}
}
创建mqtt消息事件异步处理 *
import com.baomidou.mybatisplus.core.toolkit.StringPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springblade.core.tool.utils.Func;
import org.springblade.ubw.config.MqttProperties;
import org.springblade.ubw.event.UWBMqttSubscribeEvent;
import org.springblade.ubw.service.MqttService;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import javax.annotation.Resource;
import java.util.Arrays;
import java.util.List;
@Configuration
public class MqttEventListener {
private static final Logger log = LoggerFactory.getLogger(MqttEventListener.class);
@Resource
private MqttProperties mqttProperties;
@Resource
private MqttService mqttService;
private String processTopic (String topic) {
List<String> topics = Arrays.asList(mqttProperties.getTopic());
for (String wild : topics) {
wild = wild.replace(StringPool.HASH, StringPool.EMPTY);
if (topic.startsWith(wild)) {
return topic.replace(wild, StringPool.EMPTY);
}
}
return StringPool.EMPTY;
}
@Async
@EventListener(UWBMqttSubscribeEvent.class)
public void listen (UWBMqttSubscribeEvent event) {
String topic = processTopic(event.getTopic());
Object source = event.getSource();
if (Func.isEmpty(source)) {
return;
}
mqttService.issue(topic,source);
// log.info("mqtt接收到 通道 {} 的信息为:{}",topic,source);
}
}
创建MqttService 数据处理服务类
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springblade.core.tool.utils.Func;
import org.springblade.ubw.area.entity.WorkArea;
import org.springblade.ubw.area.entity.WorkSite;
import org.springblade.ubw.area.entity.WorkSiteNeighbourInfo;
import org.springblade.ubw.area.entity.WorkSitePassInfo;
import org.springblade.ubw.area.service.WorkAreaService;
import org.springblade.ubw.area.service.WorkSiteNeighbourInfoService;
import org.springblade.ubw.area.service.WorkSitePassInfoService;
import org.springblade.ubw.area.service.WorkSiteService;
import org.springblade.ubw.constant.UbwConstant;
import org.springblade.ubw.history.entity.HistoryLocusInfo;
import org.springblade.ubw.history.entity.HistoryOverTimeSosAlarmInfo;
import org.springblade.ubw.history.service.HistoryLocusInfoService;
import org.springblade.ubw.history.service.HistoryOverTimeSosAlarmInfoService;
import org.springblade.ubw.loc.entity.LocStatusInfo;
import org.springblade.ubw.loc.entity.LocStatusInfoHistory;
import org.springblade.ubw.loc.service.LocStatusInfoHistoryService;
import org.springblade.ubw.loc.service.LocStatusInfoService;
import org.springblade.ubw.msg.entity.*;
import org.springblade.ubw.msg.service.*;
import org.springblade.ubw.system.entity.*;
import org.springblade.ubw.system.service.*;
import org.springblade.ubw.system.wrapper.MqttWrapper;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.List;
import java.util.stream.Collectors;
@Service
public class MqttService {
private static final Logger log = LoggerFactory.getLogger(MqttService.class);
@Resource
private EmployeeAndDepartmentService employeeAndDepartmentService;
@Resource
private VehicleInfoService vehicleInfoService;
@Resource
private WorkSiteService workSiteService;
@Resource
private LocStatusInfoService locStatusInfoService;
@Resource
private LocStatusInfoHistoryService locStatusInfoHistoryService;
@Resource
private LocOverTimeSosAlarminfoService locOverTimeSosAlarminfoService;
@Resource
private LocAreaOverSosAlarminfoService locAreaOverSosAlarmInfoService;
@Resource
private LocSosAlarminfoService locSosAlarminfoService;
@Resource
private AttendanceInfoService attendanceInfoService;
@Resource
private HistoryLocusInfoService historyLocusInfoService;
@Resource
private WorkSitePassInfoService workSitePassInfoService;
@Resource
private EnvironmentalMonitorInfoService environmentalMonitorInfoService;
@Resource
private TrAlertService trAlertService;
@Resource
private AddEvacuateInfoService addEvacuateInfoService;
@Resource
private CancelEvacuateInfoService cancelEvacuateInfoService;
@Resource
private WorkSiteNeighbourInfoService workSiteNeighbourInfoService;
@Resource
private LinkMsgAlarmInfoService linkMsgAlarmInfoService;
@Resource
private LeaderEmployeeInfoService leaderEmployeeInfoService;
@Resource
private ElectricMsgInfoService electricMsgInfoService;
@Resource
private WorkAreaService workAreaService;
@Resource
private HistoryOverTimeSosAlarmInfoService historyOverTimeSosAlarmInfoService;
@Resource
private SpecialWorksService specialWorksService;
@Resource
private AttendanceLocusInfoService attendanceLocusInfoService;
@Resource
private WorkTypeService workTypeService;
@Resource
private OfficePositionService officePositionService;
@Resource
private ClassTeamService classTeamService;
/**
* 方法描述: 消息分发
*
* @param topic
* @param source
* @author liwenbin
* @date 2021年12月14日 14:14:09
*/
public void issue(String topic,Object source){
switch(topic){
case UbwConstant.TOPIC_EMP :
//人员和部门信息
employeeAndDepartmentService.saveBatch(source);
break;
case UbwConstant.TOPIC_VEHICLE :
//车辆信息
List<VehicleInfo> vehicleInfos = MqttWrapper.build().toEntityList(source,new VehicleInfo());
vehicleInfoService.deleteAll();
vehicleInfoService.saveBatch(vehicleInfos);
break;
case UbwConstant.TOPIC_WORK_SITE :
//基站信息
List<WorkSite> workSites = MqttWrapper.build().toEntityList(source,new WorkSite());
workSiteService.deleteAll();
workSiteService.saveBatch(workSites);
break;
case UbwConstant.TOPIC_LOC_STATUS:
//井下车辆人员实时
List<LocStatusInfo> locStatusInfos = MqttWrapper.build().toEntityList(source,new LocStatusInfo());
if (Func.isEmpty(locStatusInfos)){
break;
}
locStatusInfoService.deleteAll();
//筛选入井人员列表
List<LocStatusInfo> inWellList = locStatusInfos.stream().filter(s -> s.getIsInWell() == 1).collect(Collectors.toList());
locStatusInfoService.saveBatch(inWellList);
//人员历史数据入库
List<LocStatusInfoHistory> locStatusInfoHistorys = MqttWrapper.build().toEntityList(source,new LocStatusInfoHistory());
locStatusInfoHistoryService.saveBatch(locStatusInfoHistorys);
break;
case UbwConstant.TOPIC_LOC_OVER_TIME:
//超时报警信息
List<LocOverTimeSosAlarminfo> locOverTimeSosAlarmInfos = MqttWrapper.build().toEntityList(source,new LocOverTimeSosAlarminfo());
locOverTimeSosAlarminfoService.saveBatch(locOverTimeSosAlarmInfos);
break;
case UbwConstant.TOPIC_LOC_OVER_AREA:
//超员报警信息
List<LocAreaOverSosAlarminfo> locAreaOverSosAlarmInfos = MqttWrapper.build().toEntityList(source,new LocAreaOverSosAlarminfo());
locAreaOverSosAlarmInfoService.saveBatch(locAreaOverSosAlarmInfos);
break;
case UbwConstant.TOPIC_LOC_SOS:
//求救报警信息
List<LocSosAlarminfo> locSosAlarmInfos = MqttWrapper.build().toEntityList(source,new LocSosAlarminfo());
locSosAlarminfoService.saveBatch(locSosAlarmInfos);
break;
case UbwConstant.TOPIC_ATTEND:
//考勤信息
List<AttendanceInfo> attendanceInfos = MqttWrapper.build().toEntityList(source,new AttendanceInfo());
attendanceInfoService.saveBatch(attendanceInfos);
break;
case UbwConstant.TOPIC_HISTORY_LOCUS:
//精确轨迹信息
List<HistoryLocusInfo> historyLocusInfos = MqttWrapper.build().toEntityList(source,new HistoryLocusInfo());
historyLocusInfoService.saveBatch(historyLocusInfos);
break;
case UbwConstant.TOPIC_WORK_SITE_PASS:
//基站经过信息
List<WorkSitePassInfo> workSitePassInfos = MqttWrapper.build().toEntityList(source,new WorkSitePassInfo());
workSitePassInfoService.saveBatch(workSitePassInfos);
break;
case UbwConstant.TOPIC_ENV_MON:
//环境监测信息
List<EnvironmentalMonitorInfo> environmentalMonitorInfos = MqttWrapper.build().toEntityList(source,new EnvironmentalMonitorInfo());
environmentalMonitorInfoService.saveBatch(environmentalMonitorInfos);
break;
case UbwConstant.TOPIC_TR_ALERT:
//环境监测报警信息
List<TrAlert> trAlerts = MqttWrapper.build().toEntityList(source,new TrAlert());
trAlertService.saveBatch(trAlerts);
break;
case UbwConstant.TOPIC_ADD_EVA:
//下发撤离信息
List<AddEvacuateInfo> addEvacuateInfos = MqttWrapper.build().toEntityList(source,new AddEvacuateInfo());
addEvacuateInfoService.saveBatch(addEvacuateInfos);
break;
case UbwConstant.TOPIC_CANCEL_EVA:
//取消撤离信息
List<CancelEvacuateInfo> cancelEvacuateInfos = MqttWrapper.build().toEntityList(source,new CancelEvacuateInfo());
cancelEvacuateInfoService.saveBatch(cancelEvacuateInfos);
break;
case UbwConstant.TOPIC_WORK_SITE_NEI:
//相邻基站关系信息
workSiteNeighbourInfoService.deleteAll();
List<WorkSiteNeighbourInfo> workSiteNeighbourInfos = MqttWrapper.build().toEntityList(source,new WorkSiteNeighbourInfo());
workSiteNeighbourInfoService.saveBatch(workSiteNeighbourInfos);
break;
case UbwConstant.TOPIC_LINK_MSG:
//基站链路信息
linkMsgAlarmInfoService.deleteAll();
List<LinkMsgAlarmInfo> linkMsgAlarmInfos = MqttWrapper.build().toEntityList(source,new LinkMsgAlarmInfo());
linkMsgAlarmInfoService.saveBatch(linkMsgAlarmInfos);
break;
case UbwConstant.TOPIC_LEADER_EMP:
//带班领导信息
leaderEmployeeInfoService.deleteAll();
List<LeaderEmployeeInfo> leaderEmployeeInfos = MqttWrapper.build().toEntityList(source,new LeaderEmployeeInfo());
leaderEmployeeInfoService.saveBatch(leaderEmployeeInfos);
break;
case UbwConstant.TOPIC_ELE_MSG:
//低电报警信息
List<ElectricMsgInfo> electricMsgInfos = MqttWrapper.build().toEntityList(source,new ElectricMsgInfo());
electricMsgInfoService.saveBatch(electricMsgInfos);
break;
case UbwConstant.TOPIC_WORK_AREA:
//区域信息
workAreaService.deleteAll();
List<WorkArea> workAreas = MqttWrapper.build().toEntityList(source,new WorkArea());
workAreaService.saveBatch(workAreas);
break;
case UbwConstant.TOPIC_HIS_OVER_TIME_SOS:
//历史超时报警信息
List<HistoryOverTimeSosAlarmInfo> historyOverTimeSosAlarmInfos = MqttWrapper.build().toEntityList(source,new HistoryOverTimeSosAlarmInfo());
historyOverTimeSosAlarmInfoService.saveBatch(historyOverTimeSosAlarmInfos);
break;
case UbwConstant.TOPIC_SPECIAL_WORK:
//特种人员预设线路信息
specialWorksService.deleteAll();
List<SpecialWorks> specialWorks = MqttWrapper.build().toEntityList(source,new SpecialWorks());
specialWorksService.saveBatch(specialWorks);
break;
case UbwConstant.TOPIC_ATTEND_LOC:
//历史考勤轨迹信息
List<AttendanceLocusInfo> attendanceLocusInfos = MqttWrapper.build().toEntityList(source,new AttendanceLocusInfo());
attendanceLocusInfoService.saveBatch(attendanceLocusInfos);
break;
case UbwConstant.TOPIC_WORK_TYPE:
//工种信息
workTypeService.deleteAll();
List<WorkType> workTypes = MqttWrapper.build().toEntityList(source,new WorkType());
workTypeService.saveBatch(workTypes);
break;
case UbwConstant.TOPIC_OFFICE_POS:
//职务信息
officePositionService.deleteAll();
List<OfficePosition> officePositions = MqttWrapper.build().toEntityList(source,new OfficePosition());
officePositionService.saveBatch(officePositions);
break;
case UbwConstant.TOPIC_CLASS_TEAM:
//班组信息
classTeamService.deleteAll();
List<ClassTeam> classTeams = MqttWrapper.build().toEntityList(source,new ClassTeam());
classTeamService.saveBatch(classTeams);
break;
default : //可选
break;
}
}
}
完结,小伙伴们,可以根据这个demo 改造自己的mqtt服务处理!!!
来源:https://blog.csdn.net/weixin_40986713/article/details/123572101


猜你喜欢
- 大家好,这是 [C#.NET 拾遗补漏] 系列的第 08 篇文章,今天讲 C# 强大的 LINQ 查询。LINQ 是我最喜欢的 C# 语言特
- 前言最近遇到个小问题,要为几十个文本框添加相同的失去焦点事件,常规的办法是在VS的事件管理器里面添加,但那样太繁琐了,几十个文本框,要加几十
- 小菜今年计算机专业大四了,学了不少软件开发方面的东西,也学着编了些小程序,踌躇满志,一心要找一个好单位。当投递了无数份简历后,终于收到了一个
- AOP我想大家都很清楚,有时候我们需要处理一些请求日志,或者对某些方法进行一些监控,如果出现例外情况应该进行怎么样的处理,现在,我们从spr
- 文章描述跑马灯效果,功能效果大家应该都知道,就是当我们的文字过长,整个页面放不下的时候(一般用于公告等),可以让它自动实现来回滚动,以让客户
- 1: Nacos搭建可以参考 https://www.jb51.net/article/196842.htmSpringCloud 版本&l
- 实际的项目开发当中,经常需要根据实际的需求来自定义AlertDialog。最近在开发一个WIFI连接的功能,点击WIFI需要弹出自定义密码输
- 在开发的过程中,往往会需要在组件中添加一些按钮,用于执行一些自定义的操作。例如你有一个组件A,里面有一个List<Collider&g
- 本文实例为大家分享了Android购物分类效果展示的具体代码,供大家参考,具体内容如下SecondActivity.javapublic c
- Spring propagation7种事务配置1、简述在声明式的事务处理中,要配置一个切面,其中就用到了propagation,表示打算对
- 在学会了java中io流的使用后,我们对于数组的排序,又多了一种使用方法。大家知道流处理数据的效率是比较理想的,那么在具体操作数组排序上,很
- DataTable可以通过RowStatus来判断状态是否发生了改变。但是有些时候我们希望在行状态即使为Modified的情况下也不要提示内
- 本文实例讲述了java实现文件重命名的方法。分享给大家供大家参考。具体如下:下载的电影总是有一些存在网站名称等没用的信息 作为一个强迫症患者
- 本文实例讲述了Android之复选框对话框用法。分享给大家供大家参考。具体如下:main.xml布局文件<?xml version=&
- 本文实例讲述了Android桌面插件App Widget用法。分享给大家供大家参考,具体如下:应用程序窗口小部件App Widgets应用程
- 前言大家都知道,equals和hashcode是java.lang.Object类的两个重要的方法,在实际应用中常常需要重写这两个方法,但至
- 前言Guava是google公司开发的一款Java类库扩展工具包,内含了丰富的API,涵盖了集合、缓存、并发、I/O等多个方面。使用这些AP
- Spring提供的工具类,主要用于框架内部使用,这个类提供了一些简单的方法,并且提供了易于使用的方法在分割字符串,如CSV字符串,以及集合和
- 一般情况下在Word中输入的文字都是横向的,今天给大家分享两种方法来设置/更改一个section内的所有文本的方向及部分文本的方向,有兴趣的
- 本文实例讲述了Android模拟器实现手机添加文件到sd卡的方法。分享给大家供大家参考,具体如下:在DDMS中直接添加文件到模拟器sd卡如果