python 多进程队列数据处理详解
作者:gmHappy 发布时间:2022-04-10 23:49:44
标签:python,多进程,队列,数据处理
我就废话不多说了,直接上代码吧!
# -*- coding:utf8 -*-
import paho.mqtt.client as mqtt
from multiprocessing import Process, Queue
import time, random, os
import camera_person_num
MQTTHOST = "172.19.4.4"
MQTTPORT = 1883
mqttClient = mqtt.Client()
q = Queue()
# 连接MQTT服务器
def on_mqtt_connect():
mqttClient.connect(MQTTHOST, MQTTPORT, 60)
mqttClient.loop_start()
# 消息处理函数
def on_message_come(lient, userdata, msg):
# print(msg.topic + ":" + str(msg.payload.decode("utf-8")))
q.put(msg.payload.decode("utf-8")) # 放入队列
print("产生消息", msg.payload.decode("utf-8"))
# 消息处理开启多进程
# p = Process(target=talk, args=("/camera/person/num/result", msg.payload.decode("utf-8")))
# p.start()
def consumer(q, pid):
print("开启消费序列进程", pid)
while True:
msg = q.get()
# p = Process(target=talk, args=("/camera/person/num/result", msg, pid))
# p.start()
talk("/camera/person/num/result", msg, pid)
# subscribe 消息订阅
def on_subscribe():
mqttClient.subscribe("test123", 1) # 主题为"test"
mqttClient.on_message = on_message_come # 消息到来处理函数
# publish 消息发布
def on_publish(topic, msg, qos):
mqttClient.publish(topic, msg, qos);
# 多进程中发布消息需要重新初始化mqttClient
def talk(topic, msg, pid):
cameraPsersonNum = camera_person_num.CameraPsersonNum(msg)
t_max, t_mean, t_min = cameraPsersonNum.personNum()
# time.sleep(20)
print("消费消息", pid, msg)
mqttClient2 = mqtt.Client()
mqttClient2.connect(MQTTHOST, MQTTPORT, 60)
mqttClient2.loop_start()
mqttClient2.publish(topic, '{"max":' + str(t_max) + ',"mean":' + str(t_mean) + ',"min:"' + t_min + '}', 1)
mqttClient2.disconnect()
def main():
on_mqtt_connect()
on_subscribe()
for i in range(1, 3):
c1 = Process(target=consumer, args=(q, i))
c1.start()
while True:
pass
if __name__ == '__main__':
main()
来源:https://blog.csdn.net/ctwy291314/article/details/81560879


猜你喜欢
- 如果用到数据筛选功能,可以使用x if condition else y的逻辑实现。如果使用的是纯Python,可以使用不断迭代的方式对每一
- 关于权限管理的思考最近用laravel设计后台,后台需要有个权限管理。权限管理实质上分为两个部分,首先是认证,然后是权限。认证部分非常好做,
- 第一种使用queue队列实现:#生产者消费者模型 其实服务器集群就是这个模型# 这里介绍的是非yield方法实现过程import threa
- 引子:在windows中python3使用 pycryptodemo 模块实现Aes加密解密。Aes加密有多种方式,自己可以上网了解下。 A
- 本文实例为大家分享了Python实现神经网络算法及应用的具体代码,供大家参考,具体内容如下首先用Python实现简单地神经网络算法:impo
- 如下所示:a = [0,1,2,3,4,5,6,7,8,9]b = a[i:j] 表示复制a[i]到a[j-1],以生成新的list对象b
- MySQL 提供了一个很有意思的Engine:Federated!如果你了解Linux下面的Link的话,就应该很好理解这个Federate
- 入门scipy.optimize中,curve_fit函数可调用非线性最小二乘法进行函数拟合,例如,现在有一个高斯函数想要被拟合则调用方法如
- 把文件解压到一个目录下这是解压后的目录 将my.ini文件考进去 双击打开my.ini找到这两行更改成自己的解压路径保存
- 参考: Smashing magzine翻译+整理: Demix当完成一项前端的工作之后,许多人都会忘记该项目的结构与细节。然而代码并不是马
- 第一种方法:用军哥的一键修改LNMP环境下MYSQL数据库密码脚本一键脚本肯定是非常方便。具体执行以下命令:wget http://soft
- 1. 正则表达式的应用在给用户发送消息时通常情况会有相同的消息模板,但其中部分信息跟用户相关,因此需要对消息模板中的变量部分进行替换。而对于
- 完成人机猜拳互动游戏的开发,用户通过控制台输入实现出拳,电脑通过程序中的随机数实现出拳,每一局结束后都要输出结果。当用户输入n时停止游戏,并
- 先来了解下什么是数据库连接池数据库连接池技术的思想非常简单,将数据库连接作为对象存储在一个Vector对象中,一旦数据库连接建立后,不同的数
- 英文原文:http://www.usabilitypost.com/2009/04/15/8-characteristics-of-succ
- myPhoneBook2.py#!/usr/bin/python# -*- coding: utf-8 -*-import reclass
- 前一篇文章《Python 中如何实现参数化测试?》中,我提到了在 Python 中实现参数化测试的几个库,并留下一个问题:它们是如何做到把一
- python提供了大量的库,可以非常方便的进行各种操作,现在把python中实现读写csv文件的方法使用程序的方式呈现出来。在编写pytho
- <!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN&
- 版权所有:Copyright 1997 Netscape Communications Corporation原文链接:Object Hie