Python多进程之进程同步及通信详解
作者:程序员-夏天 发布时间:2022-01-07 00:29:13
上篇文章介绍了什么是进程、进程与程序的关系、进程的创建与使用、创建进程池等,接下来就来介绍一下进程同步及进程通信。
进程同步
当多个进程使用同一份数据资源的时候,因为进程的运行没有顺序,运行起来也无法控制,如果不加以干预,往往会引发数据安全或顺序混乱的问题,所以要在多个进程读写共享数据资源的时候加以适当的策略,来保证数据的一致性问题。
Lock(锁)
一个Lock对象有两个方法:acquire()和release()来控制共享数据的读写权限, 看下面这张图片,使用多进程的时候会经常出现这种情况,这是因为多个进程都在抢占输出资源,共享同一打印终端,从而造成了输出信息的错乱。
那么就可以使用Lock机制:
import multiprocessing
import random
import time
def work(lock, i):
lock.acquire()
print("work'{}'执行中......".format(i), multiprocessing.current_process().name, multiprocessing.current_process().pid)
time.sleep(random.randint(0, 2))
print("work'{}'执行完毕......".format(i))
lock.release()
if __name__ == '__main__':
lock = multiprocessing.Lock()
for i in range(5):
p = multiprocessing.Process(target=work, args=(lock, i))
p.start()
由于引入了Lock机制,同一时间只能有一个进程抢占到输出资源,其他进程等待该进程结束,锁释放到,才可以抢占,这样会解决多进程间资源竞争导致数据错乱的问题,但是由并发执行变成了串行执行,会牺牲运行效率。
进程通信
上篇文章说过,进程之间互相隔离,数据是独立的,默认情况下互不影响,那要如何实现进程间通信呢?Python提供了多种进程通信的方式,下面就来说一下。
Queue(队列)
multiprocessing
模块提供的Queue多进程安全的消息队列,可以实现多进程之间的数据传递。
说明
初始化Queue()对象时(例如:q=Queue()),若括号中没有指定最⼤可接收的消息数量,或数量为负值,那么就代表可接受的消息数量没有上限(直到内存的尽头)。
Queue.qsize()
:返回当前队列包含的消息数量。Queue.empty()
:如果队列为空,返回True,反之False。Queue.full()
:如果队列满了,返回True,反之False。Queue.get(block, timeout)
:获取队列中的⼀条消息,然后将其从列队中移除,block默认值为True。如果block使⽤默认值,且没有设置timeout(单位秒),消息列队如果为空,此时程序将被阻塞(停在读取状态),直到从消息列队读到消息为⽌,如果设置了timeout,则会等待timeout秒,若还没读取到任何消息,则抛出Queue.Empty异常;如果block值为False,消息列队如果为空,则会⽴刻抛出Queue.Empty异常。Queue.get_nowait()
:相当Queue.get(False)。Queue.put(item, block, timeout)
:将item消息写⼊队列,block默认值为True,如果block使⽤默认值,且没有设置timeout(单位秒),消息列队如果已经没有空间可写⼊,此时程序将被阻塞(停在写⼊状态),直到消息列队腾出空间为⽌,如果设置了timeout,则会等待timeout秒,若还没空间,则抛出Queue.Full异常;如果block值为False,消息列队如果没有空间可写⼊,则会⽴刻抛出Queue.Full异常。Queue.put_nowait(item)
:相当于Queue.put(item, False)。
from multiprocessing import Process, Queue
import time
def write_task(queue):
"""
向队列中写入数据
:param queue: 队列
:return:
"""
for i in range(5):
if queue.full():
print("队列已满!")
message = "消息{}".format(str(i))
queue.put(message)
print("消息{}写入队列".format(str(i)))
def read_task(queue):
"""
从队列读取数据
:param queue: 队列
:return:
"""
while True:
print("从队列读取:{}".format(queue.get(True)))
if __name__ == '__main__':
print("主进程执行......")
# 主进程创建Queue,最大消息数量为3
queue = Queue(3)
pw = Process(target=write_task, args=(queue, ))
pr = Process(target=read_task, args=(queue, ))
pw.start()
pr.start()
运行结果为:
从结果我们可以看出,队列最大可以放入3条消息,后面再来消息,要等read_task从队列里取出后才行。
Pipe(管道)
Pipe常用于两个进程,两个进程分别位于管道的两端,Pipe(duplex)方法返回(conn1,conn2)代表一个管道的两端,duplex参数默认为True,即全双工模式,若为False,conn1只负责接收信息,conn2负责发送。
send()和recv()方法分别是发送和接受消息的方法。
import multiprocessing
import time
import random
def proc_send(pipe):
"""
发送消息
:param pipe:管道一端
:return:
"""
for i in range(10):
print("process send:{}".format(str(i)))
pipe.send(i)
time.sleep(random.random())
def proc_recv(pipe):
"""
接收消息
:param pipe:管道一端
:return:
"""
while True:
print("Process recv:{}".format(pipe.recv()))
time.sleep(random.random())
if __name__ == '__main__':
# 主进程创建pipe
pipe = multiprocessing.Pipe()
p1 = multiprocessing.Process(target=proc_send,args=(pipe[0], ))
p2 = multiprocessing.Process(target=proc_recv,args=(pipe[1], ))
p1.start()
p2.start()
p1.join()
p2.terminate()
执行结果为:
Semaphore(信号量)
Semaphore用来控制对共享资源的访问数量,和进程池的最大连接数类似。
import multiprocessing
import random
import time
def work(s, i):
s.acquire()
print("work'{}'执行中......".format(i), multiprocessing.current_process().name, multiprocessing.current_process().pid)
time.sleep(i*2)
print("work'{}'执行完毕......".format(i))
s.release()
if __name__ == '__main__':
s = multiprocessing.Semaphore(2)
for i in range(1, 7):
p = multiprocessing.Process(target=work, args=(s, i))
p.start()
上面的代码中使用Semaphore限制了最多有2个进程同时执行,那么来一个进程获得一把锁,计数加1,当计数等于2时,后面再来的进程均需要等待,等前面的进程释放掉,才可以获得锁。
信号量与进程池的概念上类似,但是要区分开来,信号量涉及到加锁的概念。
Event(事件)
Event用来实现进程间同步通信的。运行的机制是:全局定义了一个flag,如果flag值为False,当程序执行event.wait()方法时就会阻塞,如果flag值为True时,程序执行event.wait()方法时不会阻塞继续执行。
Event常⽤函数:
event.wait()
:在进程中插入一个标记(flag),默认为False,可以设置timeout。event.set()
:使flag为Ture。event.clear()
:使flag为False。event.is_set()
:判断flag是否为True。
import multiprocessing
import time
def wait_for_event(e):
print("wait_for_event执行")
e.wait()
print("wait_for_event: e.is_set():{}".format(e.is_set()))
def wait_for_event_timeout(e, t):
print("wait_for_event_timeout执行")
# 只会阻塞2s
e.wait(t)
print("wait_for_event_timeout:e.is_set:{}".format(e.is_set()))
if __name__ == "__main__":
e = multiprocessing.Event()
p1 = multiprocessing.Process(target=wait_for_event, args=(e,))
p1.start()
p2 = multiprocessing.Process(target=wait_for_event_timeout, args=(e, 2))
p2.start()
time.sleep(4)
# 4s之后使用e.set()将flag设为Ture
e.set()
print("主进程:flag设置为True")
执行结果如下:
来源:https://blog.csdn.net/weixin_50097774/article/details/121428521
猜你喜欢
- 设置密码保护SqlServer数据库备份文件! 备份SqlServer数据库 Backup Database [数据库] To disk=&
- 在项目中遇到一情况让困扰了半天,同一张PNG8图片为何部份图标在IE6中消失呢?当时一度怀疑是cache或hosts问题反反复复开关浏览器结
- 一,fso.GetFile提取文件相应的 File 对象1,getfile.asp<%whichfile=Serv
- 也许是这样的。下面我们来推荐一个简单的分页程序,代码和说明见下(两段虚线“-----”间的代码是实现该功能的重要语句):chunfeng.a
- 有在论坛上看到一帖,《请教查询出连续日期记录的方法》,截图如下:Insus.NET尝试写了程序并做了测试,可以得到预期的结果,SQL代码可参
- OL定义有序列表的时候,除非指定list-style-position:inside;,否则文字和前导符是有缩进的。但有的时候,OL定义的列
- 出现这样的问题是当你浏览UTF-8编码的时候,服务器默认用UTF-8的引擎来输出html,当你用再浏览GB2312的页面时,它还是用UTF-
- 写好脚本,注册好服务之后,经测试,ORACLE可以随RHEL启动而启动,但不能随系统关闭而关闭。在网上找答案,发现几乎所有的设置过程帖子都是
- 记住:这时候08安装的时候要自定义一个实例 比如:mysql2008(不能在使用默认实例了) sql server 2008 express
- 这个问题困扰了我很长很长的时间,在跨域获取数据的时候就要用到服务器端的对象,以前一直用的是Msxml.XMLHTTP。但是问题太多了,特别严
- function getElementsByClassName(elem_name,elem_tags) { //elem_name:查询的
- 使用到的函数是curl_init, curl_setopt, curl_exec,curl_close。默认是GET方法,可以选择是否使用H
- 在数组中搜索一个特定值,如果找到返回TRUE否则返回FALSE boolean in_array(mixed needle,array ha
- 第一部分:判断两张图片是否相同要查找重复的图片,必然绕不开判断两张图片是否相同。判断两张图片简单呀!图片可以看成数组,比较两个数组是否相等不
- 最近心情非常差,而且还没有触底的样子,哎~~~总是会忍不住叹气~~~前些日子在Twitter上叨唠说“不在乎IE8什么时候推出,只在乎IE6
- 前言经常在 https://lichess.org/ 上观看大师们玩的国际象棋比赛。这些棋局和棋手的水平超出了我们的想象,如果想知道谁有优势
- 用ASP代码实现对access数据库的在线压缩处理,注意压缩前请备份数据库。我们知道每个一段时间压缩一下access数据库,可以减少数据库的
- 1.intersect为取多个查询结果的交集;2.查询两个基本时间段内表记录的SQL语句;select * from shengjibiao
- 1、基本原理访问网站扫码登录页,网站给浏览器返回一个二维码和一个唯一标志KEY浏览器开启定时轮询服务器,确认KEY对应的扫码结果用户使用ap
- 1、最郁闷的发现!!先看代码:<style>#a #b #c span{color:red;}#b #c span{color: