python Multiprocessing.Pool进程池模块详解
作者:Python热爱者 发布时间:2023-08-25 09:24:59
前言
Multiprocessing.Pool可以提供指定数量的进程供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;
但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来执行它。
Pool类用于需要执行的目标很多,而手动限制进程数量又太繁琐时,如果目标少且不用控制进程数量则可以用Process类。
class multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])
processes: 是要使用的工作进程数。如果进程是None,那么使用返回的数字os.cpu_count()。也就是说根据本地的cpu个数决定,processes小于等于本地的cpu个数;
initializer: 如果initializer是None,那么每一个工作进程在开始的时候会调用initializer(*initargs)。
maxtasksperchild:工作进程退出之前可以完成的任务数,完成后用一个新的工作进程来替代原进程,来让闲置的资源被释放。maxtasksperchild默认是None,意味着只要Pool存在工作进程就会一直存活。
context: 用在制定工作进程启动时的上下文,一般使用 multiprocessing.Pool() 或者一个context对象的Pool()方法来创建一个池,两种方法都适当的设置了context。
实例方法
(1)apply(func [,args [,kwds ] ] )
使用参数args和关键字参数kwds调用func。它会阻塞,直到结果准备就绪。鉴于此块,更适合并行执行工作。此外,func 仅在池中的一个工作程序中执行。
from multiprocessing import Pool
import time
def test(p):
print(p)
time.sleep(3)
if __name__=="__main__":
pool = Pool(processes=10)
for i in range(500):
'''
('\n'
' (1)遍历500个可迭代对象,往进程池放一个子进程\n'
' (2)执行这个子进程,等子进程执行完毕,再往进程池放一个子进程,再执行。(同时只执行一个子进程)\n'
' for循环执行完毕,再执行print函数。\n'
' ')
'''
pool.apply(test, args=(i,)) #维持执行的进程总数为10,当一个进程执行完后启动一个新进程.
print('test')
pool.close()
pool.join()
'''
1
2
3
4
5
6
7
8
Process finished with exit code -1
'''
for循环内执行的步骤顺序,往进程池中添加一个子进程,执行子进程,等待执行完毕再添加一个子进程……等500个子进程都执行完了,再执行print。(从结果来看,并没有多进程并发)
(2)apply_async(func [,args [,kwds [,callback [,error_callback ] ] ] ] )
异步进程池(非阻塞),返回结果对象的方法的变体。如果指定了回调,则它应该是可调用的,它接受单个参数。当结果变为就绪时,将对其应用回调,即除非调用失败,在这种情况下将应用error_callback。如果指定了error_callback,那么它应该是一个可调用的,它接受一个参数。如果目标函数失败,则使用异常实例调用error_callback。回调应立即完成,否则处理结果的线程将被阻止。
from multiprocessing import Pool
import time
def test(p):
print(p)
time.sleep(3)
if __name__=="__main__":
pool = Pool(processes=2)
for i in range(500):
'''
(1)循环遍历,将500个子进程添加到进程池(相对父进程会阻塞)\n'
(2)每次执行2个子进程,等一个子进程执行完后,立马启动新的子进程。(相对父进程不阻塞)\n'
'''
pool.apply_async(test, args=(i,)) #维持执行的进程总数为10,当一个进程执行完后启动一个新进程.
print('test')
pool.close()
pool.join()
'''
test
0
1
2
3
4
5
6
7
Process finished with exit code -1
'''
调用join之前,先调用close或者terminate方法,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束。
(3)map(func,iterable [,chunksize ] )
map()内置函数的并行等价物(尽管它只支持一个可迭代的参数)。它会阻塞,直到结果准备就绪。此方法将iterable内的每一个对象作为单独的任务提交给进程池。可以通过将chunksize设置为正整数来指定这些块的(近似)大小。
from multiprocessing import Pool
def test(i):
print(i)
if __name__ == "__main__":
lists = [1, 2, 3]
pool = Pool(processes=2) #定义最大的进程数
pool.map(test, lists) #p必须是一个可迭代变量。
pool.close()
pool.join()
'''
1
2
3
'''
(4)map_async(func,iterable [,chunksize [,callback [,error_callback ] ] ] )
map()返回结果对象的方法的变体。需要传入可迭代对象iterable
from multiprocessing import Pool
import time
def test(p):
print(p)
time.sleep(3)
if __name__=="__main__":
pool = Pool(processes=2)
# for i in range(500):
# '''
# (1)循环遍历,将500个子进程添加到进程池(相对父进程会阻塞)\n'
# (2)每次执行2个子进程,等一个子进程执行完后,立马启动新的子进程。(相对父进程不阻塞)\n'
# '''
# pool.apply_async(test, args=(i,)) #维持执行的进程总数为10,当一个进程执行完后启动一个新进程.
pool.map_async(test, range(500))
print('test')
pool.close()
pool.join()
'''
test
0
63
1
64
2
65
3
66
Process finished with exit code -1
'''
(5)imap(func,iterable [,chunksize ] )
返回迭代器,next()调用返回的迭代器的方法得到结果,imap()方法有一个可选的超时参数: next(timeout)将提高multiprocessing.TimeoutError如果结果不能内退回超时秒。
(6)close()
防止任何更多的任务被提交到池中。 一旦完成所有任务,工作进程将退出。
(7)terminate()
立即停止工作进程而不完成未完成的工作。当池对象被垃圾收集时,terminate()将立即调用。
(8)join()
等待工作进程退出。必须打电话close()或 terminate()使用之前join()。
from multiprocessing import Pool
import time
def f(x):
return x*x
if __name__ == '__main__':
with Pool(processes=4) as pool: # start 4 worker processes
result = pool.apply_async(f, (10,)) # evaluate "f(10)" asynchronously in a single process
print(result.get(timeout=1)) # prints "100" unless your computer is *very* slow
print(pool.map(f, range(10))) # prints "[0, 1, 4,..., 81]"
it = pool.imap(f, range(10))
print(next(it)) # prints "0"
print(next(it)) # prints "1"
print(it.next(timeout=1)) # prints "4" unless your computer is *very* slow
result = pool.apply_async(time.sleep, (10,))
print(result.get(timeout=1)) # raises multiprocessing.TimeoutError
'''
100
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
0
1
4
Traceback (most recent call last):
File "C:/Users/BruceWong/Desktop/develop/multiprocessingpool.py", line 19, in <module>
print(next(res))
TypeError: 'MapResult' object is not an iterator
Process finished with exit code 1
来源:https://blog.csdn.net/qdPython/article/details/127073777


猜你喜欢
- 看了一段时间关于js原型的知识,js的扩展方法是基于原型的,如Array.prototype.XXXX就是给Array扩展XXX方法,然后数
- python 内置函数filterclass filter(object): """ filter(funct
- python中冒号实际上有两个意思:1.默认全部选择;2. 指定范围。 下面看例子定义数组X=array([[1,2,3,4],[5,6,7
- python的线程有一个类叫Timer可以,用来创建定时任务,但是它的问题是只能运行一次,如果要重复执行,则只能在任务中再调用一次timer
- 啥也不说了,眼泪哗哗的 –来自怨念深重的不灵狗。【运行环境】1、在ubuntu下使用pip安装flask-mongoengine;2、pip
- 在python中利用numpy array进行数据处理,经常需要找出符合某些要求的数据位置,有时候还需要对这些位置重新赋值。这里总结了几种找
- numpy之sum()sum(a)默认为对输入参数中的所有元素进行求和>>> aarray([ 0, 1, &
- python中迭代器和iter()函数迭代器为类序列对象提供了一个类序列的接口。python的迭代无缝地支持序列对象,而且它还允许程序员迭代
- 前言最近遇到的几个网站在提交密码时提交的已经是密文,也就是说在网络上传输的密码是密文,这样提升了密码在网络传输中的安全性。后端语言加解密已
- 语法在python3中,内置函数中已经没有reduce了。要使用reduce,需要从functools模块里引入可以看到,reduce有三个
- 代码如下:Function getTreeRootId(pNodeId) getSQL = "select note_id,par
- list解析先看下面的例子,这个例子是想得到1到9的每个整数的平方,并且将结果放在list中打印出来>>> power2
- 前言大家都知道,英文的分词由于单词间是以空格进行分隔的,所以分词要相对的容易些,而中文就不同了,中文中一个句子的分隔就是以字为单位的了,而所
- 下面列出了asp远程网页数据采集程序中经常用到的函数,很实用,特别是正则表达式过滤函数。包括了使用xmlhttp采集远程网页内容,使用ado
- 无论是 DAO 还是 ADO 都有两种从 Recordset 对象中查询记录的方法: Find 方法和 Seek 方法。在这两种方法中可以让
- import导入包呈现灰色问题问题描述pycharm中单个py文件导入包时呈灰色,而别的文件却能正常显示,我按照CSDN博客上给的设置①右键
- 以下为引用的内容: <html> <head> <title>不刷新页面查询的方法&
- DOM遍历基于ID、元素类型、类名查找元素非常有用,但是如果你想基于它在DOM树中的位置来查找元素该怎么办?换句话说,你有一个给定的元素,你
- 本文实例为大家分享了python远程连接MySQL数据库的具体代码,供大家参考,具体内容如下连接数据库这里默认大家都已经配置安装好 MySQ
- 鼠标回调函数:def setMouseCallback(windowName, #窗口名称onMouse, &n