Python并发之多进程的方法实例代码
作者:码、、码 发布时间:2022-04-13 12:43:54
一,进程的理论基础
一个应用程序,归根结底是一堆代码,是静态的,而进程才是执行中的程序,在一个程序运行的时候会有多个进程并发执行。
进程和线程的区别:
进程是系统资源分配的基本单位。
一个进程内可以包含多个线程,属于一对多的关系,进程内的资源,被其内的线程共享
线程是进程运行的最小单位,如果说进程是完成一个功能,那么其线程就是完成这个功能的基本单位
进程间资源不共享,多进程切换资源开销,难度大,同一进程内的线程资源共享,多线程切换资源开销,难度小
进程与线程的共同点:
都是为了提高程序运行效率,都有执行的优先权
二,Python的多进程( multiprocessing模块)
创建一个进程(和创建线程类似)
方法一:创建Process对象,通过对象调用start()方法启动进程
from multiprocessing import Process
def foo(name):
print('hello,%s'%name)
if __name__ == '__main__':
p1=Process(target=foo,args=('world',))
p2 = Process(target=foo, args=('China',))
p1.start()
p2.start()
print('=====主进程=====')
# == == =主进程 == == =
# hello, world
# hello, China
#主进程和子进程并发执行
注意:Process对象只能在在 if __name__ == '__main__':下创建,不然会报错。
方法二:自定义一个类继承Process类,并重写run()方法,将执行代码放在其内
from multiprocessing import Process
class MyProcess(Process):
def __init__(self,name):
super().__init__()
self.name = name
def run(self):
print('hello,%s'%self.name)
if __name__ == '__main__':
myprocess1 = MyProcess('world')
myprocess2 = MyProcess('world')
myprocess1.start()
myprocess2.start()
Process内置方法
实例方法:
p.start():启动进程,并调用该子进程中的p.run()
p.run():进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法
p.terminate():强制终止进程p,不会进行任何清理操作,如果p创建了子进程,该子进程就成了僵尸进程,使用该方法需要特别小心这种情况。如果p还保存了一个锁那么也将不会被释放,进而导致死锁
p.is_alive():如果p仍然运行,返回True
p.join([timeout]):主线程等待p终止。timeout是可选的超时时间
Process属性
p.daemon:默认值为False,如果设为True,代表p为后台运行的守护进程,当p的父进程终止时,p也随之终止,并且设定为True后,p不能创建自己的新进程,必须在p.start()之前设置
p.name:进程的名称
p.pid:进程的pid
p.exitcode:进程在运行时为None、如果为–N,表示被信号N结束(了解即可)
守护进程
类似于守护线程,只不过守护线程是对象的一个方法,而守护进程封装成对象的属性。
from multiprocessing import Process
import time
class MyProcess(Process):
def __init__(self,name):
super().__init__()
self.name = name
def run(self):
time.sleep(3)
print('hello,%s'%self.name)
if __name__ == '__main__':
myprocess1=MyProcess('world')
myprocess1.daemon = True
myprocess1.start()
print('结束')
#不会输出‘hello world',因为设置为守护进程,主进程不会等待
也可以使用join方法,使主进程等待
from multiprocessing import Process
import time
class MyProcess(Process):
def __init__(self,name):
super().__init__()
self.name = name
def run(self):
time.sleep(3)
print('hello,%s'%self.name)
if __name__ == '__main__':
myprocess1=MyProcess('world')
myprocess1.daemon = True
myprocess1.start()
myprocess1.join() #程序阻塞
print('结束')
join()
进程同步和锁
进程虽然不像线程共享资源,但是这并不意味着进程间不 需要加锁,比如不同进程会共享同一个终端 ( 屏幕),或者操作同一个文件,数据库,那么数据安全还是很有必要的,因此我们可以加锁,
from multiprocessing import Process,Lock
import time
def a_print(l): #需要传入对象,因为信息不共享
l.acquire()
print('我要打印信息')
time.sleep(1)
print('我打印完了')
l.release()
if __name__ == '__main__':
l = Lock()
for i in range(20):
p = Process(target=a_print,args=(l,))
p.start()
信号量(Semaphore)
能够并发执行的进程数,超出的进程阻塞,直到有进程运行完成。
Semaphore管理一个内置的计数器,
每当调用acquire()时内置计数器-1;
调用release() 时内置计数器+1;
计数器不能小于0;当计数器为0时,acquire()将阻塞进程直到其他进程调用release()。
from multiprocessing import Process,Queue,Semaphore
import time,random
def seat(s,n):
s.acquire()
print('学生%d坐下了'%n)
time.sleep(random.randint(1,2))
s.release()
if __name__ == '__main__':
s = Semaphore(5)
for i in range(20):
p = Process(target=seat,args=(s,i))
p.start()
print('-----主进程-------')
注意:其实信号量和锁类似,只是限制进程运行某个代码块的数量(锁为1个),并不是能限制并发的进程,如上述代码,一次性还是创建了20个进程
事件(Event)
from multiprocessing import Process,Event
import time, random
def eating(event):
event.wait()
print('去吃饭的路上...')
def makeing(event):
print('做饭中')
time.sleep(random.randint(1,2))
print('做好了,快来...')
event.set()
if __name__ == '__main__':
event=Event()
t1 = Process(target=eating,args=(event,))
t2 = Process(target=makeing,args=(event,))
t1.start()
t2.start()
# 做饭中
# 做好了,快来...
# 去吃饭的路上...
和线程事件几乎一致
进程队列(Queue)
进程队列是进程通讯的方式之一。使用multiprocessing 下的Queue
from multiprocessing import Process,Queue
import time
def func1(queue):
while True:
info=queue.get()
if info == None:
return
print(info)
def func2(queue):
for i in range(10):
time.sleep(1)
queue.put('is %d'%i)
queue.put(None) #结束的标志
if __name__ == '__main__':
q = Queue()
p1 = Process(target=func1,args=(q,))
p2 = Process(target=func2, args=(q,))
p1.start()
p2.start()
Queue类的方法,源码如下:
class Queue(object):
def __init__(self, maxsize=-1): #可以传参设置队列最大容量
self._maxsize = maxsize
def qsize(self): #返回当前时刻队列中的个数
return 0
def empty(self): #是否为空
return False
def full(self): 是否满了
return False
def put(self, obj, block=True, timeout=None): #放值,blocked和timeout。如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常
pass
def put_nowait(self, obj): #=put(False)
pass
def get(self, block=True, timeout=None): 获取值,get方法有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常.
pass
def get_nowait(self): # = get(False)
pass
def close(self): #将队列关闭
pass
def join_thread(self): #略,几乎不用
pass
def cancel_join_thread(self):
pass
进程队列源码注释
进程池
进程的消耗是很大的,因此我们不能无节制的开启新进程,因此我们可以 通过维护一个进程池来控制进程的数量 。这就不同于信号量,进程池可以从源头控制进程数量。在Python中可以通过如下方法使用
同步调用
from multiprocessing import Pool
import time, random, os
def func(n):
pid = os.getpid()
print('进程%s正在处理第%d个任务'%(pid,n),'时间%s'%time.strftime('%H-%M-%S'))
time.sleep(2)
res = '处理%s'%random.choice(['成功','失败'])
return res
if __name__ == '__main__':
p = Pool(4) #创建4个进程,
li = []
for i in range(10):
res = p.apply(func,args=(i,)) 交给进程池处理,处理完成才返回值,会阻塞,即使池内还有空余进程,相当于顺序执行
li.append(res)
for i in li:
print(i)
#进程1916正在处理第0个任务 时间21-02-53
#进程1240正在处理第1个任务 时间21-02-55
#进程3484正在处理第2个任务 时间21-02-57
#进程7512正在处理第3个任务 时间21-02-59
#进程1916正在处理第4个任务 时间21-03-01
#进程1240正在处理第5个任务 时间21-03-03
#进程3484正在处理第6个任务 时间21-03-05
#进程7512正在处理第7个任务 时间21-03-07
#进程1916正在处理第8个任务 时间21-03-09
#进程1240正在处理第9个任务 时间21-03-11
从结果可以发现两点:
不是并发处理
一直都只有四个进程,串行执行
因此进程池提供了 异步处理 的方式
from multiprocessing import Pool
import time, random, os
def func(n):
pid = os.getpid()
print('进程%s正在处理第%d个任务'%(pid,n),'时间%s'%time.strftime('%H-%M-%S'))
time.sleep(2)
res = '处理%s'%random.choice(['成功','失败'])
return res
if __name__ == '__main__':
p = Pool(4)
li = []
for i in range(10):
res = p.apply_async(func,args=(i,)) 结果不会立刻返回,遇到阻塞,开启下一个进程,在这,相当于几乎同时出现四个打印结果(一个线程处理一个任务,处理完下个任务才能进来)
li.append(res)
p.close() #join之前需要关闭进程池
p.join() #因为异步,所以需要等待池内进程工作结束再继续
for i in li:
print(i.get()) #i是一个对象,通过get方法获取返回值,而同步则没有该方法
关于回调函数
from multiprocessing import Pool
import time, random, os
def func(n):
pid = os.getpid()
print('进程%s正在处理第%d个任务'%(pid,n),'时间%s'%time.strftime('%H-%M-%S'))
time.sleep(2)
res = '处理%s'%random.choice(['成功','失败'])
return res
def foo(info):
print(info) #传入值为进程执行结果
if __name__ == '__main__':
p = Pool(4)
li = []
for i in range(10):
res = p.apply_async(func,args=(i,),callback = foo) callback()回调函数会在进程执行完之后调用(主进程调用)
li.append(res)
p.close()
p.join()
for i in li:
print(i.get())
有回调函数
总结
以上所述是小编给大家介绍的Python并发之多进程的方法实例代码网站的支持!
来源:http://www.cnblogs.com/ifyoushuai/p/9471569.html
猜你喜欢
- MySQL 客户端连接成功后,通过 show [session|global]status 命令 可以提供服务器状态信息,也可以在操作系统上
- 本文实例讲述了Python验证码识别的方法。分享给大家供大家参考。具体实现方法如下:#encoding=utf-8import Image,
- 本文实例讲述了利用PHP函数计算中英文字符串长度的方法。分享给大家供大家参考。具体实现方法如下:一般来说大家知道英文字符占一个字节,而中文字
- 这个只是一个简单的比较无聊的尝试,类似blog等一些网站的换肤功能都是以此方式改变网站的皮肤。对于这些方法大家会的比较多,没啥说的。主要我是
- 除了3天就会失效的临时素材外,开发者有时需要永久保存一些素材,届时就可以通过本接口新增永久素材。最近更新,永久图片素材新增后,将带有URL返
- 前言因为写好了测试xmind脑图后,然后再编写测试用例,实在是太麻烦了,所以我写了一点测试用例后,就网上百度了下,怎么直接把xmind脑图转
- 因文件格式要求,需要将docx 与doc文件相互转换,特寻找python代码,与大家共分享from win32com import clie
- 多表查询1. 增删改一对多:先一后多,外键可以为对象或依赖表的主键(publish and book)publish = Publish.o
- 如何用METADATA替换ADOVBS.INC? 在ASP中,使用组件时,如ADO,得先包含
- 前言发现本站没有一个靠谱的tp6记录行为日志的教程,于是就整理了一下自己在项目中已经投入使用的行为日志中间件的详细配置步骤供大家参考提示:先
- 这是一个很简单的纯CSS相册滑动浏览效果,仅用一个无序列表ul结合简单的CSS就可以实现。原文中介绍的纵向滑动相册的实现方法,但是相比之下个
- 基于web的技术中,分页是一个老的不能再老的,但大家津津乐道的问题,随着xml技术的日渐应用,把xml应用到分页当中,也是一种可能,当然网上
- Golang Goroutine和线程的区别 Golang,轻松学习一、Golang Goroutine?当使用者分配足够多的任务,系统能自
- 破解滑块验证码的思路主要有2种:获得一张完整的背景图和一张有缺口的图片,两张图片进行像素上的一一对比,找出不一样的坐标。获得一张有缺口的图片
- 1 词频统计1.1 简单词频统计1.导入jieba库并定义文本import jiebatext = "Python是一种高级编程语
- 一、牛顿多项式拉格朗日多项式的公式不具备递推性,每个多项式需要单独构造。但很多时候我们需要从若干个逼近多项式选择一个。这个时候我们就需要一个
- 在用ThinkPHP做tags标签的时候,出现了一个问题,就是能获取到参数,但是查不出相应的结果。查看数据库发现数据是存在的。问题出在哪了呢
- 在IE下测试,发现最大值是:18014398509481984(0x40000000000000)另外发现一个奇怪的问题:JS世界居然不存在
- 一个简单的例子:将如下代码另存为.wsc文件,并右键“注册”(卸载时右键“不注册”)。<Component> <regis
- 本文实例讲述了Python表示矩阵的方法。分享给大家供大家参考,具体如下:在c语言中,表示个“整型3行4列”的矩阵,可以这样声明:int&n