Python进程间通信方式
作者:程序猿-张益达 发布时间:2021-12-03 03:08:39
一、通信方式
进程彼此之间互相隔离,要实现进程间通信(IPC),multiprocessing模块主要通过队列方式
队列:队列类似于一条管道,元素先进先出
需要注意的一点是:队列都是在内存中操作,进程退出,队列清空,另外,队列也是一个阻塞的形态
二、Queue介绍
创建队列的类(底层就是以管道和锁定的方式实现):
Queue([maxsize]):创建共享的进程队列,Queue
是多进程安全的队列,
可以使用Queue实现多进程之间的数据传递。maxsize
是队列中允许最大项数,省略则无大小限制。
三、方法介绍
def put(self, obj, block=True, timeout=None):
插入数据到队列中,Block值默认为True,代表当队列已满时,会阻塞。如果block为False,则队列满会报异常Queue.Full,timeout表示会阻塞到指定时间,直到有剩余的空间供插入,如果时间超时,则报异常Queue.Fulldef get(self, block=True, timeout=None):从队列中取出数据,Block值默认为True,代表当队列为空时,会阻塞。如果block为False,则队列空会报异常Queue.Empty,timeout表示会等待到指定时间,直到取出数据,如果时间超时,则报异常Queue.Empty
def empty(self): 判断队列是否为空,如果空返回True
def full(self): 判断队列是否已满,如果满返回True
def qsize(self): 返回队列的大小
应用举例:
from multiprocessing import Process, Manager
q = Manager().Queue(2)
q.put(1)
q.put(2,block=False,timeout=2)
def func():
print(q.get())
p = Process(target=func)
print("size",q.qsize())
print("full",q.full())
p.start()
p.join()
print("empty",q.empty())
print("get", q.get())
print("get", q.get(block=False,timeout=2))
输出结果:
三、生产者和消费者模型
在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。
为什么要使用生产者和消费者模式?
在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。
四、什么是生产者消费者模式
生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯:
生产者,只需要往队列里面丢东西(生产者不需要关心消费者)
消费者,只需要从队列里面拿东西(消费者也不需要关心生产者)
阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。
实现方式一:Queue
from multiprocessing import Process,Manager,active_children
import random
import queue
import time
class Producer(Process):
def __init__(self,queue):
super().__init__()
self.queue = queue
def run(self):
for i in range(6):
r = random.randint(0, 99)
time.sleep(1)
self.queue.put(r)
print("add data{}".format(r))
class Consumer(Process):
def __init__(self,queue):
super().__init__()
self.queue = queue
def run(self):
while True:
if not self.queue.empty():
data = self.queue.get()
print("minus data{}".format(data))
if __name__ == '__main__':
q = Manager().Queue() # 创建队列
p = Producer(q)
c = Consumer(q)
p.start()
c.start()
print(active_children()) # 查看现有的进程
p.join()
c.join()
print("结束")
>>>输出
[<ForkProcess(SyncManager-1, started)>, <Producer(Producer-2, started)>, <Consumer(Consumer-3, started)>]
add data83
minus data83
add data72
minus data72
add data8
minus data8
add data63
minus data63
add data75
minus data75
add data52
minus data52
实现方式二:利用JoinableQueue
JoinableQueue([maxsize]):一个Queue对象,但队列允许项目的使用者通知生成者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。JoinableQueue
的实例除了与Queue对象相同的方法之外还具有:
task_done():使用者使用此方法发出信号,表示get()的返回项目已经被处理。如果调用此方法的次数大于从队列中删除项目的数量,将引发ValueError异常
join():生产者调用此方法进行阻塞,直到队列中所有的项目均被处理。阻塞将持续到队列中的每个项目均调用task_done()方法为止
from multiprocessing import Process,JoinableQueue
import os
import time
import random
def print_log(msg, log_type="prod"):
if log_type == 'prod':
print("\033[32;1m%s\033[0m" %msg)
elif log_type == 'con':
print("\033[31;1m%s\033[0m" %msg)
def producer(q):
"""
生产者
:param q:
:return:
"""
for i in range(10):
data = random.randint(1,200)
time.sleep(2)
q.put(data) # 放入队列
msg = "add data {}".format(data)
print_log(msg)
q.join() # 生产者调用此方法进行阻塞,直到队列中所有的项目均被处理。
# 阻塞将持续到队列中的每个项目均调用q.task_done()方法为止
def consumer(q):
"""
消费者
:param q:
:return:
"""
while True:
if not q.empty():
time.sleep(5)
data = q.get()
msg = "minus data{}".format(data)
print_log(msg,"con")
q.task_done() # q.get()的返回项目已经被处理
if __name__ == '__main__':
q = JoinableQueue()
prod = Process(target=producer, args=(q,))
con = Process(target=consumer, args=(q,))
con.daemon = True # 设置为守护进程,但是不用担心,producer内调用q.join保证了consumer已经处理完队列中的所有元素
# 开启进程
prod.start()
con.start()
prod.join() # 等待生产和消费完成,主线程结束
print("结束")
输出结果:
来源:https://blog.csdn.net/weixin_41951954/article/details/123331551


猜你喜欢
- 代码如下:<%@ Language=VBScript %> <% Dim 
- 刚刚想从数据库中的表EXPERT_DETAILS中检索出修改人Modifier(类型 VARCHAR2(20),可为空)为空的那些记录,因为
- 将纸的材质融为设计元素现已成为当今网页设计最热门最流行的设计趋势之一。网页设计师可以使用纸屑、硬纸板纹理材质、笔记本和记事薄构成有趣而复杂的
- 背景最近在搭一个新项目的架子,在生产环境中,为了能实时的监控程序的运行状态,少不了逻辑执行时间长度的统计。时间统计这个功能实现的期望有下面几
- 首先我要吐槽一下,看程序的过程中遇见了yield这个关键字,然后百度的时候,发现没有一个能简单的让我懂的,讲起来真TM的都是头头是道,什么参
- Python程序运行时,打开了多个窗口,使用win32gui模块可以设置指定的某一个窗口为当前活动窗口。import re, timeimp
- 比如:,master,test, 表示 该用户为 test 的下级代码,test登录后可以看到 test名下的业务和所有下级代理的业务。相关
- 前言PHP 中有个释放变量的语句叫做unset(从PHP4开始unset已经不再是一个函数了,而是一个语句),本文主要给大家介绍了关于php
- 在ASP中使用FSO组件生成HTML静态页面,共有两个页面index.htm是首页.程序比较简单,主要是用了ASP里的文件操作
- Requests是用Python编写,基于urllib,采用Apache2 Licensed开源协议的HTTP库。它比urllib更方便,可
- 第一步:要使用vant组件安装好vant,npm i vant -S第二步:在你要用到的地方js中引入或者在src/main.js里面引入i
- 1969年8月8日,在北京协和医院降生了一个漂亮的小女孩。接生的阿姨说,她的声音这么大,好象想要全世界的人都听到。后来,她的父亲为她取了一个
- 超文本传输协议http构成了万维网的基础,它利用URI(统一资源标识符)来识别Internet上的数据,而指定文档地址的URI被称为URL(
- 在MacOs运行的PyCharm中,执行python文件,如果不指定python文件字符编码会报错:SyntaxError: Non-ASC
- 因为要写个东西用到,所以百度了一下,居然有朋友乱写,而且比较多,都没有认真测试过,只对字符可以,但是对数字就不可以,而且通用性很差,需要修改
- 引言webpack插件CommonsChunkPlugin的主要作用是抽取webpack项目入口chunk的公共部分,具体的用法就不做过多介
- items()方法返回字典的(键,值)元组对的列表语法以下是items()方法的语法:dict.items()参数 &
- sysbench是一个模块化的、跨平台、多线程基准测试工具,主要用于评估测试各种不同系统参数下的数据库负载情况。目前sysbench代码托管
- 该章节我们将要学习如何将 word 文件转为 PDF文件,其实网上有很多种生成 PDF 的教程,不过绝大多数都是以 windows 为主的,
- 前言当我们使用Python完成自己的原创的工具时,比如:端口扫描、弱口令爆破等。你是否想过添加自己的Logo,以及简要的帮助信息?如下:Sq