网络编程
位置:首页>> 网络编程>> Python编程>> python 多进程队列数据处理详解

python 多进程队列数据处理详解

作者:gmHappy  发布时间:2022-04-10 23:49:44 

标签:python,多进程,队列,数据处理

我就废话不多说了,直接上代码吧!


# -*- coding:utf8 -*-
import paho.mqtt.client as mqtt
from multiprocessing import Process, Queue
import time, random, os
import camera_person_num

MQTTHOST = "172.19.4.4"
MQTTPORT = 1883
mqttClient = mqtt.Client()
q = Queue()

# 连接MQTT服务器
def on_mqtt_connect():
 mqttClient.connect(MQTTHOST, MQTTPORT, 60)
 mqttClient.loop_start()

# 消息处理函数
def on_message_come(lient, userdata, msg):
 # print(msg.topic + ":" + str(msg.payload.decode("utf-8")))

q.put(msg.payload.decode("utf-8")) # 放入队列
 print("产生消息", msg.payload.decode("utf-8"))
 # 消息处理开启多进程
 # p = Process(target=talk, args=("/camera/person/num/result", msg.payload.decode("utf-8")))
 # p.start()

def consumer(q, pid):
 print("开启消费序列进程", pid)
 while True:
   msg = q.get()
   # p = Process(target=talk, args=("/camera/person/num/result", msg, pid))
   # p.start()
   talk("/camera/person/num/result", msg, pid)

# subscribe 消息订阅
def on_subscribe():
 mqttClient.subscribe("test123", 1) # 主题为"test"
 mqttClient.on_message = on_message_come # 消息到来处理函数

# publish 消息发布
def on_publish(topic, msg, qos):
 mqttClient.publish(topic, msg, qos);

# 多进程中发布消息需要重新初始化mqttClient
def talk(topic, msg, pid):
 cameraPsersonNum = camera_person_num.CameraPsersonNum(msg)
 t_max, t_mean, t_min = cameraPsersonNum.personNum()
 # time.sleep(20)
 print("消费消息", pid, msg)
 mqttClient2 = mqtt.Client()
 mqttClient2.connect(MQTTHOST, MQTTPORT, 60)
 mqttClient2.loop_start()
 mqttClient2.publish(topic, '{"max":' + str(t_max) + ',"mean":' + str(t_mean) + ',"min:"' + t_min + '}', 1)
 mqttClient2.disconnect()

def main():

on_mqtt_connect()
 on_subscribe()
 for i in range(1, 3):
   c1 = Process(target=consumer, args=(q, i))
   c1.start()
 while True:
   pass

if __name__ == '__main__':
 main()

来源:https://blog.csdn.net/ctwy291314/article/details/81560879

0
投稿

猜你喜欢

手机版 网络编程 asp之家 www.aspxhome.com