go语言实现mqtt协议的实践
作者:文杰@ 发布时间:2024-04-23 09:34:38
一、什么是MQTT
MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的“轻量级”通讯协议,该协议构建于TCP/IP协议上,由IBM在1999年发布。
MQTT最大优点在于,可以以极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。作为一种低开销、低带宽占用的即时通讯协议,使其在物联网、小型设备、移动应用等方面有较广泛的应用
MQTT是一个基于客户端-服务器的消息发布/订阅传输协议。MQTT协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛。在很多情况下,包括受限的环境中,如:机器与机器(M2M)通信和物联网(IoT)。其在,通过卫星链路通信传感器、偶尔拨号的医疗设备、智能家居、及一些小型化设备中已广泛使用
MQTT还有一个特点就是客户端之间不用相互通信, MQTT通信更像是邮箱服务,发布者发布消息到服务器,接收者只要订阅了其服务在线后即可收到
实现MQTT协议需要客户端和服务器端通讯完成,在通讯过程中,MQTT协议中有三种身份:发布者(Publish)、代理(Broker)(服务器)、订阅者(Subscribe)。其中,消息的发布者和订阅者都是客户端,消息代理是服务器,消息发布者可以同时是订阅者。
MQTT传输的消息分为:主题(Topic)和负载(payload)两部分:
(1)Topic,可以理解为消息的类型,订阅者订阅(Subscribe)后,就会收到该主题的消息内容(payload);
(2)payload,可以理解为消息的内容,是指订阅者具体要使用的内容。
Topic就是消息名,payload就是消息体
MQTT会构建底层网络传输:它将建立客户端到服务器的连接,提供两者之间的一个有序的、无损的、基于字节流的双向传输。
当应用数据通过MQTT网络发送时,MQTT会把与之相关的**服务质量(QoS)和主题名(Topic)**相关连。
二、Go语言MQTT服务器Broker的搭建
服务端用erlang编写的一个开源项目:emqqtd
# 下载安装包
wget https://github.com/emqx/emqx/releases/download/v4.0.4/emqx-ubuntu18.04-v4.0.4.zip
cd mqttd/emqx
.
├── bin
├── data
├── erts-10.5.2
├── etc
├── lib
├── log
└── releases
# 开启服务
./bin/emqx start
# 查看状态
./bin/emqx_ctl status
# 停止服务
./bin/emqx stop
找到自己的IP,访问http://[你的IP]:18083/#/clients
用户名:admin
密码:public
即可进入服务器的控制台
三、Go客户端访问简单API
客户端用golang客户端的库:“github.com/eclipse/paho.mqtt.golang”
# 下载依赖包
go get -u github.com/eclipse/paho.mqtt.golang
实例如下:
编写了两个函数一个发布一个订阅,传入参数即可服务
修改EMQServerAddress
为你服务器的IP
package main
// 与后端mqtt服务交互
import (
"fmt"
mqtt "github.com/eclipse/paho.mqtt.golang"
"log"
"os"
"strconv"
"time"
)
const EMQServerAddress = "你的IP"
// 创建全局mqtt publish消息处理 handler
var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
fmt.Println("Push Message:")
fmt.Printf("TOPIC: %s\n", msg.Topic())
fmt.Printf("MSG: %s\n", msg.Payload())
}
// 创建全局mqtt sub消息处理 handler
var messageSubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
fmt.Println("收到订阅消息:")
fmt.Printf("Sub Client Topic : %s \n", msg.Topic())
fmt.Printf("Sub Client msg : %s \n", msg.Payload())
}
// 连接的回掉函数
var connectHandler mqtt.OnConnectHandler =func(client mqtt.Client) {
fmt.Println("新的连接!" + " Connected")
}
// 丢失连接的回掉函数
var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
fmt.Printf("Connect loss: %v\n", err)
}
func init() {
// 配置错误提示
mqtt.DEBUG = log.New(os.Stdout, "[mqttDEBUG]", 0)
mqtt.ERROR = log.New(os.Stdout, " [mqttERROR]", 0)
}
/**
* @Description: 发布订阅
* @param clientID
* @param addr
* @param topic
* @param payload
*/
func Push(topic string, qos byte, retain bool, payload string) {
// opts ClientOptions 用于设置 broker,端口,客户端 id ,用户名密码等选项
opts := mqtt.NewClientOptions().AddBroker("tcp://" + EMQServerAddress + ":1883").SetClientID("test_push")
opts.SetKeepAlive(60 * time.Second)
// Message callback handler,在没有任何订阅时,发布端调用此函数
opts.SetDefaultPublishHandler(messagePubHandler)
opts.SetPingTimeout(1 * time.Second)
opts.OnConnect = connectHandler
opts.OnConnectionLost = connectLostHandler
client := mqtt.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
panic(token.Error())
}
//发布消息
// qos是服务质量: ==1: 一次, >=1: 至少一次, <=1:最多一次
// retained: 表示mqtt服务器要保留这次推送的信息,如果有新的订阅者出现,就会把这消息推送给它(持久化推送)
token := client.Publish(topic, qos, retain, payload)
token.Wait()
fmt.Println("Push Data : "+topic, "Data Size is "+strconv.Itoa(len(payload)))
fmt.Println("Disconnect with broker")
client.Disconnect(250)
}
/**
* @Description: 订阅与取消订阅
* @param clientID
* @param addr
* @param topic
* @param isSub
*/
func Subscription(topic string, qos byte, isSub bool, handleFun func([]byte)) {
opts := mqtt.NewClientOptions().AddBroker("tcp://" + EMQServerAddress + ":1883").SetClientID("sub_test")
opts.SetKeepAlive(60 * time.Second)
opts.SetPingTimeout(1 * time.Second)
opts.OnConnect = func(client mqtt.Client) {
fmt.Println("New Subscription! Connected" + " => " + topic)
}
opts.OnConnectionLost = connectLostHandler
client := mqtt.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
panic(token.Error())
}
if isSub {
// 订阅消息
if token := client.Subscribe(topic, qos, func(client mqtt.Client, msg mqtt.Message) {
fmt.Printf("Receive Subscribe Message :")
fmt.Printf("Sub Client Topic : %s, Data size is %d \n", msg.Topic(), len(msg.Payload()))
if len(msg.Payload()) > 0 {
handleFun(msg.Payload())
}
}); token.Wait() && token.Error() != nil {
fmt.Println(token.Error())
os.Exit(1)
}
} else {
// 取消订阅
if token := client.Unsubscribe(topic); token.Wait() && token.Error() != nil {
fmt.Println(token.Error())
os.Exit(1)
}
}
}
学习资料:
https://studygolang.com/articles/14452
https://blog.csdn.net/jacky128256/article/details/105610456
来源:https://blog.csdn.net/weixin_43988498/article/details/118714860
猜你喜欢
- 一、乱码的原因:1、 client客户端的编码不是utf82、server端的编码不是utf83、database数据库的编码不是utf84
- 本文介绍了Python字符串格式化,主要有两种方法,分享给大家,具体如下用于字符串的拼接,性能更优。字符串格式化有两种方式:百分号方式、fo
- 工作中我们经常要两段代码的区别,或者需要查看接口返回的字段与预期是否一致,如何快速定位出两者的差异?除了一些对比的工具比如Beyond Co
- 一、需求描述1.图片展示从如图所示的数据中提取含有"python"、"ubuntu"关键词的所有行数
- 老师罚学生抄写英文单词,结果学生给抄成这样……
- 他们原来都想用PHP的实现随机,但取出多条好像要进行两次以上查询. 翻了手册,找到了下面这个语句,可以完成任务了,但效率较低SELECT&n
- 我希望大家敲一遍<!DOCTYPE html><html><head><meta charset=
- 本文实例为大家分享了Python实现井字棋小游戏的具体代码,供大家参考,具体内容如下import osdef print_board(boa
- 前言:首先,笔者不是web安全的专家,所以这不是web安全方面专家级文章,而是学习笔记、细心总结文章,里面有些是我们phper不易发现或者说
- 什么是Lambda表达式“Lambda 表达式”(lambda expression)是一个匿名函数,Lambda表达式基于数学中的λ演算得
- 常用方法#记住引入numpy时要是用别名np,则所有的numpy字样都要替换 #查询数值类型>>>type(float)d
- 引言最近再做图像处理相关的操作的时间优化,用到了OpenCV和Pillow两个库,两个库各有优缺点。各位小伙伴需要按照自己需求选用。本篇博客
- MySQL 表别名(Alias)SQL 表别名在 SQL 语句中,可以为表名称及字段(列)名称指定别名(Alias),别名是 SQL 标准语
- 论文:Interactive Image Warping(1993年Andreas Gustafsson)算法思路:假设当前点为(x,y),
- 1.每个客户端连接都会从服务器进程中分到一个属于它的线程。而该连接的相应查询都都会通过该线程处理。2.服务器会缓存线程。因此并不会为每个新连
- 引言 在前篇文章中(SQL查询入门(上篇),我对数据库查询的基本概念以及单表查询做了详细的解释,本篇文章中,主要说明SQL中的各种连接以及使
- 模块是用类编写的,只有一个StringIO类,所以它的可用方法都在类中。此类中的大部分函数都与对文件的操作方法类似。例:#coding=gb
- 如下所示:result = os.popen('ps aux') res = result.read()
- 这是个删除非空目录的例子test.asp要执行删除你需要对该目录具有修改权限<% dim fso,tmpfold
- 对于软件来说,每一个新版本的推出都应该是一种进步,不可否认,阿里旺旺2008版相较于之前的版本的确是有很多的进步,但进步的同时却也有着比之前