Python多进程通信Queue、Pipe、Value、Array实例
作者:junjie 发布时间:2023-04-08 03:25:55
queue和pipe的区别: pipe用来在两个进程间通信。queue用来在多个进程间实现通信。 此两种方法为所有系统多进程通信的基本方法,几乎所有的语言都支持此两种方法。
1)Queue & JoinableQueue
queue用来在进程间传递消息,任何可以pickle-able的对象都可以在加入到queue。
multiprocessing.JoinableQueue 是 Queue的子类,增加了task_done()和join()方法。
task_done()用来告诉queue一个task完成。一般地在调用get()获得一个task,在task结束后调用task_done()来通知Queue当前task完成。
join() 阻塞直到queue中的所有的task都被处理(即task_done方法被调用)。
代码:
import multiprocessing
import time
class Consumer(multiprocessing.Process):
def __init__(self, task_queue, result_queue):
multiprocessing.Process.__init__(self)
self.task_queue = task_queue
self.result_queue = result_queue
def run(self):
proc_name = self.name
while True:
next_task = self.task_queue.get()
if next_task is None:
# Poison pill means shutdown
print ('%s: Exiting' % proc_name)
self.task_queue.task_done()
break
print ('%s: %s' % (proc_name, next_task))
answer = next_task() # __call__()
self.task_queue.task_done()
self.result_queue.put(answer)
return
class Task(object):
def __init__(self, a, b):
self.a = a
self.b = b
def __call__(self):
time.sleep(0.1) # pretend to take some time to do the work
return '%s * %s = %s' % (self.a, self.b, self.a * self.b)
def __str__(self):
return '%s * %s' % (self.a, self.b)
if __name__ == '__main__':
# Establish communication queues
tasks = multiprocessing.JoinableQueue()
results = multiprocessing.Queue()
# Start consumers
num_consumers = multiprocessing.cpu_count()
print ('Creating %d consumers' % num_consumers)
consumers = [ Consumer(tasks, results)
for i in range(num_consumers) ]
for w in consumers:
w.start()
# Enqueue jobs
num_jobs = 10
for i in range(num_jobs):
tasks.put(Task(i, i))
# Add a poison pill for each consumer
for i in range(num_consumers):
tasks.put(None)
# Wait for all of the tasks to finish
tasks.join()
# Start printing results
while num_jobs:
result = results.get()
print ('Result:', result)
num_jobs -= 1
注意小技巧: 使用None来表示task处理完毕。
运行结果:
2)pipe
pipe()返回一对连接对象,代表了pipe的两端。每个对象都有send()和recv()方法。
代码:
from multiprocessing import Process, Pipe
def f(conn):
conn.send([42, None, 'hello'])
conn.close()
if __name__ == '__main__':
parent_conn, child_conn = Pipe()
p = Process(target=f, args=(child_conn,))
p.start()
p.join()
print(parent_conn.recv()) # prints "[42, None, 'hello']"
3)Value + Array
Value + Array 是python中共享内存 映射文件的方法,速度比较快。
from multiprocessing import Process, Value, Array
def f(n, a):
n.value = n.value + 1
for i in range(len(a)):
a[i] = a[i] * 10
if __name__ == '__main__':
num = Value('i', 1)
arr = Array('i', range(10))
p = Process(target=f, args=(num, arr))
p.start()
p.join()
print(num.value)
print(arr[:])
p2 = Process(target=f, args=(num, arr))
p2.start()
p2.join()
print(num.value)
print(arr[:])
# the output is :
# 2
# [0, 10, 20, 30, 40, 50, 60, 70, 80, 90]
# 3
# [0, 100, 200, 300, 400, 500, 600, 700, 800, 900]


猜你喜欢
- 目前绝大多数手机都支持WAP 2.0。WAP 2.0的页面设计具有更好的视觉效果,更接近网页。不过由于手机千差万别,手机浏览器的能力也各不相
- 版本一conda install xxx:这种方式安装的库都会放在/Users/orion-orion/miniforge3/pkgs目录下
- DATAFRAME中使用iat[1,0]和iloc[0,1]对元素进行修改。a = [("hahaha",1),(&qu
- 1. python中创建新的csv文件(1). 使用csv.writer()创建:代码如下:import csvheaders = [
- 几天写过两篇使用VPS的安全性设置的博文,其实不管我们如何设置安全,及时的备份VPS数据才是最为重要的。因为VPS与主机不同,主机可能很多时
- k8s容器互联-flannel host-gw原理篇容器系列文章容器系列视频简析host-gw前面分析了flannel vxlan模式进行容
- 在默认情况下,Access 2000/2002数据库是以“共享”的方式打开的,这样可以保证多人能够同时使用同一个数据库。不过,在共享方式打开
- 8. 使用DECODE函数来减少处理时间使用DECODE函数可以避免重复扫描相同记录或重复连接相同的表. 例如: SELECT COUNT(
- 摘要:SELECT 语句可以帮助我们从MySQL中取出数据。SELECT 大概是 SQL 语言中最常用的语句,而且怎样使用它也最为讲究;用它
- Jupyter Notebook本身是默认使用一种Anaconda中root目录下的Python环境的,如果想使用其它的虚拟环境,还需要通过
- 这篇文章主要介绍了python3下pygame如何实现显示中文,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,
- python怎么模拟点击网页按钮前提环境: Python3 和 Visual Studio Code安装完毕安装selenium : 在终端
- 1.什么是搜索引擎?搜索引擎是“对网络信息资源进行搜集整理并提供信息查询服务的系统,包括信息搜集、信息整理和用户查询三部分”。如图1是搜索引
- 目录一.简介二.特色三.flask规模化四. flask Blueprint总结一.简介Flask是一个使用Python编写的轻量级Web应
- 需求最近公司干活,收到一个需求,说是让手动将数据库查出来的信息复制粘贴到excel中,在用excel中写好的公式将指定的两列数据用updat
- 什么是异常?异常是一个事件,其中一个程序,破坏程序的指令的正常流的执行过程中而发生的。一般情况下,当一个Python脚本遇到一些情况不能处理
- ASP正则表达式,RegExp对象提供简单的正则表达式支持功能。RegExp对象的用法: Function RegExpTest(
- SQL Server 2005开始,我们可以直接通过CTE来支持递归查询,CTE即公用表表达式公用表表达式(CTE),是一个在查询中定义的临
- 本文实例为大家分享了python实现银行系统的具体代码,供大家参考,具体内容如下1、admin.py 定义管理员信息和主界面显示#!/usr
- 很多小伙伴对于slice参数的概念理解停留在概念上,切片的参数有三个,分别是step 、start 、stop 。因为参数的值也是多变的,所