go语言实现mqtt协议的实践
作者:文杰@ 发布时间:2024-04-23 09:34:38
一、什么是MQTT
MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的“轻量级”通讯协议,该协议构建于TCP/IP协议上,由IBM在1999年发布。
MQTT最大优点在于,可以以极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。作为一种低开销、低带宽占用的即时通讯协议,使其在物联网、小型设备、移动应用等方面有较广泛的应用
MQTT是一个基于客户端-服务器的消息发布/订阅传输协议。MQTT协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛。在很多情况下,包括受限的环境中,如:机器与机器(M2M)通信和物联网(IoT)。其在,通过卫星链路通信传感器、偶尔拨号的医疗设备、智能家居、及一些小型化设备中已广泛使用
MQTT还有一个特点就是客户端之间不用相互通信, MQTT通信更像是邮箱服务,发布者发布消息到服务器,接收者只要订阅了其服务在线后即可收到
实现MQTT协议需要客户端和服务器端通讯完成,在通讯过程中,MQTT协议中有三种身份:发布者(Publish)、代理(Broker)(服务器)、订阅者(Subscribe)。其中,消息的发布者和订阅者都是客户端,消息代理是服务器,消息发布者可以同时是订阅者。
MQTT传输的消息分为:主题(Topic)和负载(payload)两部分:
(1)Topic,可以理解为消息的类型,订阅者订阅(Subscribe)后,就会收到该主题的消息内容(payload);
(2)payload,可以理解为消息的内容,是指订阅者具体要使用的内容。
Topic就是消息名,payload就是消息体
MQTT会构建底层网络传输:它将建立客户端到服务器的连接,提供两者之间的一个有序的、无损的、基于字节流的双向传输。
当应用数据通过MQTT网络发送时,MQTT会把与之相关的**服务质量(QoS)和主题名(Topic)**相关连。
二、Go语言MQTT服务器Broker的搭建
服务端用erlang编写的一个开源项目:emqqtd
# 下载安装包
wget https://github.com/emqx/emqx/releases/download/v4.0.4/emqx-ubuntu18.04-v4.0.4.zip
cd mqttd/emqx
.
├── bin
├── data
├── erts-10.5.2
├── etc
├── lib
├── log
└── releases
# 开启服务
./bin/emqx start
# 查看状态
./bin/emqx_ctl status
# 停止服务
./bin/emqx stop
找到自己的IP,访问http://[你的IP]:18083/#/clients
用户名:admin
密码:public
即可进入服务器的控制台
三、Go客户端访问简单API
客户端用golang客户端的库:“github.com/eclipse/paho.mqtt.golang”
# 下载依赖包
go get -u github.com/eclipse/paho.mqtt.golang
实例如下:
编写了两个函数一个发布一个订阅,传入参数即可服务
修改EMQServerAddress
为你服务器的IP
package main
// 与后端mqtt服务交互
import (
"fmt"
mqtt "github.com/eclipse/paho.mqtt.golang"
"log"
"os"
"strconv"
"time"
)
const EMQServerAddress = "你的IP"
// 创建全局mqtt publish消息处理 handler
var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
fmt.Println("Push Message:")
fmt.Printf("TOPIC: %s\n", msg.Topic())
fmt.Printf("MSG: %s\n", msg.Payload())
}
// 创建全局mqtt sub消息处理 handler
var messageSubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
fmt.Println("收到订阅消息:")
fmt.Printf("Sub Client Topic : %s \n", msg.Topic())
fmt.Printf("Sub Client msg : %s \n", msg.Payload())
}
// 连接的回掉函数
var connectHandler mqtt.OnConnectHandler =func(client mqtt.Client) {
fmt.Println("新的连接!" + " Connected")
}
// 丢失连接的回掉函数
var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
fmt.Printf("Connect loss: %v\n", err)
}
func init() {
// 配置错误提示
mqtt.DEBUG = log.New(os.Stdout, "[mqttDEBUG]", 0)
mqtt.ERROR = log.New(os.Stdout, " [mqttERROR]", 0)
}
/**
* @Description: 发布订阅
* @param clientID
* @param addr
* @param topic
* @param payload
*/
func Push(topic string, qos byte, retain bool, payload string) {
// opts ClientOptions 用于设置 broker,端口,客户端 id ,用户名密码等选项
opts := mqtt.NewClientOptions().AddBroker("tcp://" + EMQServerAddress + ":1883").SetClientID("test_push")
opts.SetKeepAlive(60 * time.Second)
// Message callback handler,在没有任何订阅时,发布端调用此函数
opts.SetDefaultPublishHandler(messagePubHandler)
opts.SetPingTimeout(1 * time.Second)
opts.OnConnect = connectHandler
opts.OnConnectionLost = connectLostHandler
client := mqtt.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
panic(token.Error())
}
//发布消息
// qos是服务质量: ==1: 一次, >=1: 至少一次, <=1:最多一次
// retained: 表示mqtt服务器要保留这次推送的信息,如果有新的订阅者出现,就会把这消息推送给它(持久化推送)
token := client.Publish(topic, qos, retain, payload)
token.Wait()
fmt.Println("Push Data : "+topic, "Data Size is "+strconv.Itoa(len(payload)))
fmt.Println("Disconnect with broker")
client.Disconnect(250)
}
/**
* @Description: 订阅与取消订阅
* @param clientID
* @param addr
* @param topic
* @param isSub
*/
func Subscription(topic string, qos byte, isSub bool, handleFun func([]byte)) {
opts := mqtt.NewClientOptions().AddBroker("tcp://" + EMQServerAddress + ":1883").SetClientID("sub_test")
opts.SetKeepAlive(60 * time.Second)
opts.SetPingTimeout(1 * time.Second)
opts.OnConnect = func(client mqtt.Client) {
fmt.Println("New Subscription! Connected" + " => " + topic)
}
opts.OnConnectionLost = connectLostHandler
client := mqtt.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
panic(token.Error())
}
if isSub {
// 订阅消息
if token := client.Subscribe(topic, qos, func(client mqtt.Client, msg mqtt.Message) {
fmt.Printf("Receive Subscribe Message :")
fmt.Printf("Sub Client Topic : %s, Data size is %d \n", msg.Topic(), len(msg.Payload()))
if len(msg.Payload()) > 0 {
handleFun(msg.Payload())
}
}); token.Wait() && token.Error() != nil {
fmt.Println(token.Error())
os.Exit(1)
}
} else {
// 取消订阅
if token := client.Unsubscribe(topic); token.Wait() && token.Error() != nil {
fmt.Println(token.Error())
os.Exit(1)
}
}
}
学习资料:
https://studygolang.com/articles/14452
https://blog.csdn.net/jacky128256/article/details/105610456
来源:https://blog.csdn.net/weixin_43988498/article/details/118714860


猜你喜欢
- 本文实例讲述了Python操作mongodb数据库进行模糊查询操作。分享给大家供大家参考,具体如下:# -*- coding: utf-8
- 为了方便各位朋友,本文收集了一些对Web开发人员非常有用的手册,记得推荐一下哦。HTML 速查手册HTML/XTML in one page
- 环境:vue 2.9.3; webpack目的:接口的调用跨域方式:1、express中间的使用2、nginx代理3、谷歌浏览器跨域设置--
- 本文实例讲述了Python实现获取本地及远程图片大小的方法。分享给大家供大家参考,具体如下:了解过Pillow的都知道,Pillow是一个非
- char、varchar、text和nchar、nvarchar、ntext的区别1、CHAR。CHAR存储定长数据很方便,CHAR字段上的
- $也能匹配\n见Perl语言入门,page 132, 注释61 /^.*$/能匹配"\n"么?能!因为$不仅能匹配行尾,
- 一、用户管理在mysql库里有个user表可以查看已经创建的用户1.创建MySQL用户注意:MySQL中不能单纯通过用户名来说明用户,必须要
- 数据的变化反应到视图前面我们了解到数据劫持之后,我们可以在数据发生修改之后做任何我们想要做的事情,操作视图当然也是OK的命令式操作视图目标:
- 安装anaconda登录anaconda的官网下载,anaconda是一个集成的工具软件不需要我们再次下载。anaconda官网点击下载跳转
- 代码return JsonResponse({"name": "tom"})报错:TYPEERROR
- 这里介绍几个常用的列表操作添加元素添加元素使用列表的内置方法appendnumber = [1, 2, 3, 4]number.append
- 传说用这个语句管用:select top 5 * from tablename order by newid() 我放到sql的查询分析器里
- 以下实例为通过用户输入两个数字,并计算两个数字之和:# -*- coding: UTF-8 -*-# Filename : test.py#
- header() 函数向客户端发送原始的 HTTP 报头。<?php//200 正常状态header('HTTP/1.1 20
- 今天帮助同事解决一个问题,问题是她做的一套页面在FF下显示正常,在IE6下样式却没有效果,也就是没有应用样式。最终发现是编码不匹配的问题,c
- 看下面的例子, var btn = document.getElementById('mybtn'); btn.onclic
- 先来看一段代码:# ~*~ Twisted - A Python tale ~*~from time import sleep# Hello
- Python最大的优点之一就是语法简洁,好的代码就像伪代码一样,干净、整洁、一目了然。要写出 Pythonic(优雅的、地道的、整洁的)代码
- MySQL DATE_FORMAT函数简介要将日期值格式化为特定格式,请使用DATE_FORMAT函数。 DATE_FORMAT函数的语法如
- 逻辑斯蒂回归模型多分类任务上节中,我们使用逻辑斯蒂回归完成了二分类任务,针对多分类任务,我们可以采用以下措施,进行分类。我们以三分类任务为例