python多进程下的生产者和消费者模型
作者:李俊的博客 发布时间:2022-05-30 02:37:07
一、生产者消费者模型介绍
1.1 为什么需要使用生产者消费者模型
生产者是指生产数据的任务,消费者是指消费数据的任务。当生产者的生产能力远大于消费者的消费能力,生产者就需要等消费者消费完才能继续生产新的数据,同理,如果消费者的消费能力远大于生产者的生产能力,消费者就需要等生产者生产完数据才能继续消费,这种等待会造成效率的低下,为了解决这种问题就引入了生产者消费者模型。
1.2 如何实现生产者消费者模型
进程间引入队列可以实现生产者消费者模型,通过使用队列无需考虑锁的概念,因为进程间的通信是通过队列来实现的;
生产者生产的数据往队列里面写,消费者消费数据直接从队列里面取,这样就对实现了生产者和消费者之间的解耦。
生产者 -- > 队列 <--消费者
二、Queue实现生产者消费者模型
2.1 消费者生产者模型代码
from multiprocessing import Process, Queue
import time
# 消费者方法
def consumer(q, name):
while True:
res = q.get()
# if res is None: break
print("%s 吃了 %s" % (name, res))
# 生产者方法
def producer(q, name, food):
for i in range(3):
time.sleep(1) # 模拟生产西瓜的时间延迟
res = "%s %s" % (food, i)
print("%s 生产了 %s" % (name, res))
# 把生产的vegetable放入到队列中
q.put(res)
if __name__ == "__main__":
#创建队列
q = Queue()
# 创建生产者
p1 = Process(target=producer, args=(q, "kelly", "西瓜"))
c1 = Process(target=consumer, args=(q, "peter",))
p1.start()
c1.start()
# p1.join()
# q.put(None)
print("主进程")
2.2 执行结果
2.2.1 直接执行上面的代码的结果
直接执行会出现一个问题就是生产者生产完了,没有向消费者发送一个停止的信号,所以消费者一直会一直阻塞在q.get(),导致程序无法退出。
为了解决上面的问题,让消费者消费完了生产者的数据之后自动退出,就需要在生产者进程介绍的时候往队列里面put一个结束信号,消费者拿到这个信号,就退出消费进程。
主要是两个地方修改 ,把下方代码的注释打开就可以实现消费者消费完接收到生产者的结束信号就退出消费者进程了。
def consumer():
if res is None: break
if __name__ == "__main__":
p1.join()
q.put(None)
2.2.2 把注释打开后的运行结果
把注释打开后,消费者拿到了生产者发送的结束信号,可以正常退出程序了。
但如果有n个消费者,就需要发送n个结束信号,这种方式就不是那么简洁,像下面的代码这样:
from multiprocessing import Process, Queue
import time
# 消费者方法
def consumer(q, name):
while True:
res = q.get()
if res is None: break
print("%s 吃了 %s" % (name, res))
# 生产者方法
def producer(q, name, food):
for i in range(3):
time.sleep(1) # 模拟生产西瓜的时间延迟
res = "%s %s" % (food, i)
print("%s 生产了 %s" % (name, res))
# 把生产的vegetable放入到队列中
q.put(res)
if __name__ == "__main__":
# 创建队列
q = Queue()
# 创建生产者
p1 = Process(target=producer, args=(q, "kelly", "西瓜"))
p2 = Process(target=producer, args=(q, "kelly2", "香蕉"))
c1 = Process(target=consumer, args=(q, "peter",))
c2 = Process(target=consumer, args=(q, "peter2",))
c3 = Process(target=consumer, args=(q, "peter3",))
p1.start()
p2.start()
c1.start()
c2.start()
c3.start()
p1.join()
p2.join()
q.put(None)
q.put(None)
q.put(None)
print("主进程")
其实我们现在就是生产者生产完数据之后想往队列里面发送一个结束信号,python语言提供了另外一种队列JoinableQueue([maxsize])来解决这种问题
三、JoinableQueue实现生产者消费者模型
3.1 JoinableQueue方法介绍
JoinableQueue([maxsize]) : A queue type which also supports join() and task_done() methods
q.task_done():消费者使用此方法发出信号,表示q.get()的返回项目已经被处理。
q.join():生产者调用此方法进行阻塞,直到队列中所有的项目均被处理;阻塞将持续到队列中的每个项目均调用q.task_done()方法为止。
3.2 JoinableQueue实现生产者消费者模型源码
from multiprocessing import Process,JoinableQueue
import time
# 消费者方法
def consumer(q, name):
while True:
res = q.get()
if res is None: break
print("%s 吃了 %s" % (name, res))
q.task_done() # 发送信号给q.join(),表示已经从队列中取走一个值并处理完毕了
# 生产者方法
def producer(q, name, food):
for i in range(3):
time.sleep(1) # 模拟生产西瓜的时间延迟
res = "%s %s" % (food, i)
print("%s 生产了 %s" % (name, res))
# 把生产的vegetable放入到队列中
q.put(res)
q.join() # 等消费者把自己放入队列的所有元素取完之后才结束
if __name__ == "__main__":
# q = Queue()
q = JoinableQueue()
# 创建生产者
p1 = Process(target=producer, args=(q, "kelly", "西瓜"))
p2 = Process(target=producer, args=(q, "kelly2", "蓝莓"))
# 创建消费者
c1 = Process(target=consumer, args=(q, "peter",))
c2 = Process(target=consumer, args=(q, "peter2",))
c3 = Process(target=consumer, args=(q, "peter3",))
c1.daemon = True
c2.daemon = True
c3.daemon = True
p_l = [p1, p2, c1, c2, c3]
for p in p_l:
p.start()
p1.join()
p2.join()
# 1.主进程等待p1,p2进程结束才继续执行
# 2.由于q.join()的存在,生产者只有等队列中的元素被消费完才会结束
# 3.生产者结束了,就代表消费者已经消费完了,也可以结束了,所以可以把消费者设置为守护进程(随着主进程的退出而退出)
print("主进程")
3.3 运行结果
通过运行结果可以看出,生产者没有手动发送结束信号给消费者,而是通过JoinableQueue队列的方式也实现了生产者消费者模型。
来源:https://blog.csdn.net/qq_36441027/article/details/105929246


猜你喜欢
- 文件操作1#文件操作流程:1、打开文件,得到一个文件句柄;通过文件句柄操作文件;关闭文件。#将文件打开文件赋给file1,test_file
- 安装好所需要的插件和包:python、django、pip等版本如下:采用Django REST框架3.01、在python文件夹下D:\p
- 本文实例讲述了Python实现复杂对象转JSON的方法。分享给大家供大家参考,具体如下:在Python对于简单的对象转json还是比较简单的
- 前言今天帮师兄赶在deadline之前画论文的图,现学现卖很是刺激,现把使用matplotlib的子库pyplot画折线图和柱状图的代码记录
- 我们在进行表单设计时,可能要用到select下拉选项控件,遗憾的是,IE浏览器默认的select控件外观非常丑陋,而且不能用样式来控制,不能
- jQuery.parent(expr) &nb
- 本文实例讲述了Python实现可获取网易页面所有文本信息的网易网络爬虫功能。分享给大家供大家参考,具体如下:#coding=utf-8#--
- 输入:数字m mm,n nn输出:m mm行n nn列的数字蛇形方阵1. 从方阵最左上角开始,顺时针向内填充。初始化一个m mm行n nn列
- 一、背景大家都知道gevent的机制是单线程+协程机制,当遇到可能会阻塞的操作时,就切换到可运行的协程中继续运行,以此来实现提交系统运行效率
- 1.random库的使用:random库是使用随机数的Python标准库从概率论角度来说,随机数是随机产生的数据(比如抛硬币),但时计算机是
- 本文实例讲述了python实现指定字符串补全空格的方法。分享给大家供大家参考。具体分析如下:如果希望字符串的长度固定,给定的字符串又不够长度
- 表单在提交前我们通常会用客户端JS对其内容进行验证,通常都是写一个函数然后在onsumbit事件中调用,如下:<html><
- 由于是从源码包安装的Mysql,所以系统中是没有红帽常用的servcie mysqld restart这个脚本只好手工重启有人建议Killa
- 301和302 Http状态有啥区别?301,302 都是HTTP状态的编码,都代表着某个URL发生了转移,不同之处在于:301 redir
- redis是一个key-value存储结构。和Memcached类似,它支持存储的value类型相对更多,包括string(字符串)、lis
- 作为互联网产品设计师,在和前端开发人员沟通时你是否常常会听到这样的声音: —— “大姐,给点专业精神好不好,这个表格是自适应的,你
- 又是一杯奶茶~事情的经过是这样的:又是奶茶,行吧行吧。快点开工,争取李大伟回来之前搞定。李大伟说是6位数字密码那么我们可以利用python生
- 简介Go的标准包Container中包含了常用的容器类型,包括conatiner/list,container/heap,container
- stylus及stylus-loader版本问题安装下面指定本版就解决了:"stylus": "^0.54.7
- 前2天群里发了张git历史图,如下:根据提交历史,可以看出图中所有分支合并都采用merge的方式,具体merge是怎么操作的,可以阅读下边文