在 Python 中使用 MQTT的方法
作者:EMQX 发布时间:2022-01-26 12:32:48
目录
项目初始化
选择 MQTT 客户端库
Pip 安装 Paho MQTT 客户端
Python MQTT 使用
连接 MQTT 服务器
导入 Paho MQTT客户端
设置 MQTT Broker 连接参数
编写 MQTT 连接函数
发布消息
订阅消息
完整代码
消息发布代码
消息订阅代码
测试
消息发布
消息订阅
总结
Python 是一种广泛使用的解释型、高级编程、通用型编程语言。Python 的设计哲学强调代码的可读性和简洁的语法(尤其是使用空格缩进划分代码块,而非使用大括号或者关键词)。Python 让开发者能够用更少的代码表达想法,不管是小型还是大型程序,该语言都试图让程序的结构清晰明了。
MQTT 是一种基于发布/订阅模式的 轻量级物联网消息传输协议 ,可以用极少的代码和带宽为联网设备提供实时可靠的消息服务,它广泛应用于物联网、移动互联网、智能硬件、车联网、电力能源等行业。
本文主要介绍如何在 Python 项目中使用 paho-mqtt 客户端库 ,实现客户端与 MQTT 服务器的连接、订阅、取消订阅、收发消息等功能。
项目初始化
本项目使用 Python 3.6 进行开发测试,读者可用如下命令确认 Python 的版本。
➜ ~ python3 --version
Python 3.6.7
选择 MQTT 客户端库
paho-mqtt 是目前 Python 中使用较多的 MQTT 客户端库,它在 Python 2.7 或 3.x 上为客户端类提供了对 MQTT v3.1 和 v3.1.1 的支持。它还提供了一些帮助程序功能,使将消息发布到 MQTT 服务器变得非常简单。
Pip 安装 Paho MQTT 客户端
Pip 是 Python 包管理工具,该工具提供了对 Python 包的查找、下载、安装、卸载的功能。
pip3 install -i https://pypi.doubanio.com/simple paho-mqtt
Python MQTT 使用
连接 MQTT 服务器
本文将使用 EMQ X 提供的 免费公共 MQTT 服务器 ,该服务基于 EMQ X 的 MQTT 物联网云平台 创建。服务器接入信息如下:
Broker: broker.emqx.io
TCP Port: 1883
Websocket Port: 8083
导入 Paho MQTT客户端
from paho.mqtt import client as mqtt_client
设置 MQTT Broker 连接参数
设置 MQTT Broker 连接地址,端口以及 topic,同时我们调用 Python random.randint 函数随机生成 MQTT 客户端 id。
broker = 'broker.emqx.io'
port = 1883
topic = "/python/mqtt"
client_id = f'python-mqtt-{random.randint(0, 1000)}'
编写 MQTT 连接函数
编写连接回调函数 on_connect ,该函数将在客户端连接后被调用,在该函数中可以依据 rc 来判断客户端是否连接成功。通常同时我们将创建一个 MQTT 客户端,该客户端将连接到 broker.emqx.io 。
def connect_mqtt():
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
else:
print("Failed to connect, return code %d\n", rc)
# Set Connecting Client ID
client = mqtt_client.Client(client_id)
client.on_connect = on_connect
client.connect(broker, port)
return client
发布消息
首先定义一个 while 循环语句,在循环中我们将设置每秒调用 MQTT 客户端 publish 函数向 /python/mqtt 主题发送消息。
def publish(client):
msg_count = 0
while True:
time.sleep(1)
msg = f"messages: {msg_count}"
result = client.publish(topic, msg)
# result: [0, 1]
status = result[0]
if status == 0:
print(f"Send `{msg}` to topic `{topic}`")
else:
print(f"Failed to send message to topic {topic}")
msg_count += 1
订阅消息
编写消息回调函数 on_message ,该函数将在客户端从 MQTT Broker 收到消息后被调用,在该函数中我们将打印出订阅的 topic 名称以及接收到的消息内容。
def subscribe(client: mqtt_client):
def on_message(client, userdata, msg):
print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")
client.subscribe(topic)
client.on_message = on_message
完整代码
消息发布代码
# python 3.6
import random
import time
from paho.mqtt import client as mqtt_client
broker = 'broker.emqx.io'
port = 1883
topic = "/python/mqtt"
# generate client ID with pub prefix randomly
client_id = f'python-mqtt-{random.randint(0, 1000)}'
def connect_mqtt():
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
else:
print("Failed to connect, return code %d\n", rc)
client = mqtt_client.Client(client_id)
client.on_connect = on_connect
client.connect(broker, port)
return client
def publish(client):
msg_count = 0
while True:
time.sleep(1)
msg = f"messages: {msg_count}"
result = client.publish(topic, msg)
# result: [0, 1]
status = result[0]
if status == 0:
print(f"Send `{msg}` to topic `{topic}`")
else:
print(f"Failed to send message to topic {topic}")
msg_count += 1
def run():
client = connect_mqtt()
client.loop_start()
publish(client)
if __name__ == '__main__':
run()
消息订阅代码
# python 3.6
import random
import time
from paho.mqtt import client as mqtt_client
broker = 'broker.emqx.io'
port = 1883
topic = "/python/mqtt"
# generate client ID with pub prefix randomly
client_id = f'python-mqtt-{random.randint(0, 1000)}'
def connect_mqtt():
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
else:
print("Failed to connect, return code %d\n", rc)
client = mqtt_client.Client(client_id)
client.on_connect = on_connect
client.connect(broker, port)
return client
def publish(client):
msg_count = 0
while True:
time.sleep(1)
msg = f"messages: {msg_count}"
result = client.publish(topic, msg)
# result: [0, 1]
status = result[0]
if status == 0:
print(f"Send `{msg}` to topic `{topic}`")
else:
print(f"Failed to send message to topic {topic}")
msg_count += 1
def run():
client = connect_mqtt()
client.loop_start()
publish(client)
if __name__ == '__main__':
run()
消息订阅代码
# python3.6
import random
from paho.mqtt import client as mqtt_client
broker = 'broker.emqx.io'
port = 1883
topic = "/python/mqtt"
# generate client ID with pub prefix randomly
client_id = f'python-mqtt-{random.randint(0, 100)}'
def connect_mqtt() -> mqtt_client:
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
else:
print("Failed to connect, return code %d\n", rc)
client = mqtt_client.Client(client_id)
client.on_connect = on_connect
client.connect(broker, port)
return client
def subscribe(client: mqtt_client):
def on_message(client, userdata, msg):
print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")
client.subscribe(topic)
client.on_message = on_message
def run():
client = connect_mqtt()
subscribe(client)
client.loop_forever()
if __name__ == '__main__':
run()
测试
消息发布
运行 MQTT 消息发布代码,我们将看到客户端连接成功,并且成功将消息发布。
python3 pub.py
消息订阅
运行 MQTT 消息订阅代码,我们将看到客户端连接成功,并且成功接收到发布的消息。
python3 sub.py
总结
至此,我们完成了使用 paho-mqtt 客户端连接到 公共 MQTT 服务器 ,并实现了测试客户端与 MQTT 服务器的连接、消息发布和订阅。
与 C ++ 或 Java 之类的高级语言不同,Python 比较适合设备侧的业务逻辑实现,使用 Python 您可以减少代码上的逻辑复杂度,降低与设备的交互成本。我们相信在物联网领域 Python 将会有更广泛的应用。
接下来我们将会陆续发布更多关于物联网开发及 Python 的相关文章,敬请关注。
来源:https://www.tuicool.com/articles/N3QvQbm


猜你喜欢
- 穿过云朵升一级是要花6个金币的,有的时候金币真的很重要前言嗨喽,大家好呀!这里是魔王~一天晚上,天空中掉下一颗神奇的豌豆种子,正好落在了梦之
- 执行文件和目标导入模块在同一目录直接import比如我要在ma_main.py中导入env包中的make_env.py文件, 从而读取其中的
- 之前介绍过通过cookie 绕过验证码实现登录的方法。这里并不多余,会增加分析和另外一种方法实现登录。1、思路介绍 1.1、直接看
- 问题描述: 买了mac电脑,第一次装mysql,不知道初始密码,如何修改初始密码记录下。解决方式:htt
- 这篇文章主要介绍了Python接口自动化判断元素原理解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的
- 动态生成二级菜单树:<script>jQuery(function($) {/**********获取未处理报警信息总数****
- 目录match 分组re.sub 匹配和替换反向引用参考re 模块是 Python 标准库中提供的用于处理正则表达式的模块,利用
- 前言微服务中的日志采集方案ELK(EFK)已经是基本事实标准了,但是单体服务中却没有像ELK这样的成熟采集方案,这与单体性质有关,单体毕竟涉
- 本文是基于Windows 10系统环境,实现python生成随机数、随机字符、随机字符串:Windows 10PyCharm 2018.3.
- 1、集合相加a = {1,2,3}b = {3,4,5}print(type(a))print(a|b)2、queryset 符合条件的筛序
- 有不少朋友在开发爬虫的过程中喜欢使用Selenium + Chromedriver,以为这样就能做到不被网站的反爬虫机制发现。先不说淘宝这种
- 简单实现ip代理,为了不卖广告,请自行准备一个ip代理的平台例如我用的这个平台,每次提取10个ip从上面可以看到数据格式是文本,换行是\r\
- BULK INSERT以用户指定的格式复制一个数据文件至数据库表或视图中。 语法:BULK INSERT [ [ 'database
- 安装pyecharts1.8.0版本后导入pyecharts模块绘图时报错: “所有图表类型将在 v1.9.0 版本开始强制使用 Chart
- “%”的使用格式符描述%s字符串 (采用str()的显示)%r字符串 (采用repr()的显示)%c
- 当where子句对某一列使用函数时,除非利用这个简单的技术强制索引,否则Oracle优化器不能在查询中使用索引。通常情况下,如果在WHERE
- 问题你想使用一个装饰器去包装函数,但是希望返回一个可调用的实例。 你需要让你的装饰器可以同时工作在类定义的内部和外部。解决方案为了将装饰器定
- 在炼丹时,数据的读取与预处理是关键一步。不同的模型所需要的数据以及预处理方式各不相同,如果每个轮子都我们自己写的话,是很浪费时间和精力的。P
- PHP hebrev() 函数实例反向显示希伯来字符:<?php echo hebrev("á çù&
- 利用go语言的协程并发优势爬取网页速度相当之快,博客园100页新闻标题只需一秒即可全部爬取package mainimport ( &quo