python threading和multiprocessing模块基本用法实例分析
作者:zhaozhi406 发布时间:2021-08-13 04:23:49
本文实例讲述了python threading和multiprocessing模块基本用法。分享给大家供大家参考,具体如下:
前言
这两天为了做一个小项目,研究了一下python的并发编程,所谓并发无非多线程和多进程,最初找到的是threading模块,因为印象中线程“轻量...”,“切换快...”,“可共享进程资源...”等等,但是没想到这里水很深,进而找到了更好的替代品multiprocessing模块。下面会讲一些使用中的经验。
后面出现的代码都在ubuntu10.04 + python2.6.5的环境下测试通过。
一、使用threading模块创建线程
1、三种线程创建方式
(1)传入一个函数
这种方式是最基本的,即调用threading中的Thread类的构造函数,然后指定参数target=func,再使用返回的Thread的实例调用start()
方法,即开始运行该线程,该线程将执行函数func,当然,如果func需要参数,可以在Thread的构造函数中传入参数args=(...)。示例代码如下:
#!/usr/bin/python
#-*-coding:utf-8-*-
import threading
#用于线程执行的函数
def counter(n):
cnt = 0;
for i in xrange(n):
for j in xrange(i):
cnt += j;
print cnt;
if __name__ == '__main__':
#初始化一个线程对象,传入函数counter,及其参数1000
th = threading.Thread(target=counter, args=(1000,));
#启动线程
th.start();
#主线程阻塞等待子线程结束
th.join();
这段代码很直观,counter函数是一个很无聊的双重循环,需要注意的是th.join()
这句,这句的意思是主线程将自我阻塞,然后等待th表示的线程执行完毕再结束,如果没有这句,运行代码会立即结束。join的意思比较晦涩,其实将这句理解成这样会好理解些“while th.is_alive(): time.sleep(1)”。虽然意思相同,但是后面将看到,使用join也有陷阱。
(2)传入一个可调用的对象
许多的python 对象都是我们所说的可调用的,即是任何能通过函数操作符“()”来调用的对象(见《python核心编程》第14章)。类的对象也是可以调用的,当被调用时会自动调用对象的内建方法__call__()
,因此这种新建线程的方法就是给线程指定一个__call__方法被重载了的对象。示例代码如下:
#!/usr/bin/python
#-*-coding:utf-8-*-
import threading
#可调用的类
class Callable(object):
def __init__(self, func, args):
self.func = func;
self.args = args;
def __call__(self):
apply(self.func, self.args);
#用于线程执行的函数
def counter(n):
cnt = 0;
for i in xrange(n):
for j in xrange(i):
cnt += j;
print cnt;
if __name__ == '__main__':
#初始化一个线程对象,传入可调用的Callable对象,并用函数counter及其参数1000初始化这个对象
th = threading.Thread(target=Callable(counter, (1000,)));
#启动线程
th.start();
#主线程阻塞等待子线程结束
th.join();
这个例子关键的一句是apply(self.func, self.args);
这里使用初始化时传入的函数对象及其参数来进行一次调用。
(3)继承Thread类
这种方式通过继承Thread类,并重载其run方法,来实现自定义的线程行为,示例代码如下:
#!/usr/bin/python
#-*-coding:utf-8-*-
import threading, time, random
def counter():
cnt = 0;
for i in xrange(10000):
for j in xrange(i):
cnt += j;
class SubThread(threading.Thread):
def __init__(self, name):
threading.Thread.__init__(self, name=name);
def run(self):
i = 0;
while i < 4:
print self.name,'counting...\n';
counter();
print self.name,'finish\n';
i += 1;
if __name__ == '__main__':
th = SubThread('thread-1');
th.start();
th.join();
print 'all done';
这个例子定义了一个SubThread类,它继承了Thread类,并重载了run方法,在方法中调用counter4次并打印一些信息,可以看到这种方式比较直观。在构造函数中要记得先调用父类的构造函数进行初始化。
2、python多线程的限制
python多线程有个讨厌的限制,全局解释器锁(global interpreter lock),这个锁的意思是任一时间只能有一个线程使用解释器,跟单cpu跑多个程序一个意思,大家都是轮着用的,这叫“并发”,不是“并行”。手册上的解释是为了保证对象模型的正确性!这个锁造成的困扰是如果有一个计算密集型的线程占着cpu,其他的线程都得等着....,试想你的多个线程中有这么一个线程,得多悲剧,多线程生生被搞成串行;当然这个模块也不是毫无用处,手册上又说了:当用于IO密集型任务时,IO期间线程会释放解释器,这样别的线程就有机会使用解释器了!所以是否使用这个模块需要考虑面对的任务类型。
二、使用multiprocessing创建进程
1、三种创建方式
进程的创建方式跟线程完全一致,只不过要将threading.Thread换成multiprocessing.Process
。multiprocessing模块尽力保持了与threading模块在方法名上的一致性,示例代码可参考上面线程部分的。这里只给出第一种使用函数的方式:
#!/usr/bin/python
#-*-coding:utf-8-*-
import multiprocessing, time
def run():
i = 0;
while i<10000:
print 'running';
time.sleep(2);
i += 1;
if __name__ == '__main__':
p = multiprocessing.Process(target=run);
p.start();
#p.join();
print p.pid;
print 'master gone';
2、创建进程池
该模块还允许一次创建一组进程,然后再给他们分配任务。详细内容可参考手册,这部分研究不多,不敢乱写。
pool = multiprocessing.Pool(processes=4)
pool.apply_async(func, args...)
3、使用进程的好处
完全并行,无GIL的限制,可充分利用多cpu多核的环境;可以接受linux信号,后面将看到,这个功能非常好用。
三、实例研究
该实例假想的任务是:一个主进程会启动多个子进程分别处理不同的任务,各个子进程可能又有自己的线程用于不同的IO处理(前面说过,线程在IO方面还是不错的),要实现的功能是,对这些子进程发送信号,能被正确的处理,例如发生SIGTERM,子进程能通知其线程收工,然后“优雅”的退出。现在要解决的问题有:(1)在子类化的Process对象中如何捕捉信号;(2)如何“优雅的退出”。下面分别说明。
1、子类化Process并捕捉信号
如果是使用第一种进程创建方式(传入函数),那么捕捉信号很容易,假设给进程运行的函数叫func,代码示例如下:
#!/usr/bin/python
#-*-coding:utf-8-*-
import multiprocessing, signal,time
def handler(signum, frame):
print 'signal', signum;
def run():
signal.signal(signal.SIGTERM, handler);
signal.signal(signal.SIGINT, handler);
i = 0;
while i<10000:
print 'running';
time.sleep(2);
i += 1;
if __name__ == '__main__':
p = multiprocessing.Process(target=run);
p.start();
#p.join();
print p.pid;
print 'master gone';
这段代码是在第一种创建方式的基础上修改而来的,增加了两行signal.signal(...)
调用,这是说这个函数要捕捉SIGTERM和SIGINT两个信号,另外增加了一个handler函数,该函数用于捕捉到信号时进行相应的处理,我们这里只是简单的打印出信号值。
注意p.join()
被注释掉了,这里跟线程的情况有点区别,新的进程启动后就开始运行了,主进程也不用等待它运行完,可以该干嘛干嘛去。这段代码运行后会打印出子进程的进程id,根据这个id,在另一个终端输入kill -TERM id,会发现刚才的终端打印出了"signal 15"。
但是使用传入函数的方式有一点不好的是封装性太差,如果功能稍微复杂点,将会有很多的全局变量暴露在外,最好还是将功能封装成类,那么使用类又怎么注册信号相应函数呢?上面的例子貌似只能使用一个全局的函数,手册也没有给出在类中处理信号的例子,其实解决方法大同小异,也很容易,这个帖子http://stackoverflow.com/questions/6204443/python-signal-reading-return-from-signal-handler-function给了我灵感:
class Master(multiprocessing.Process):
def __init__(self):
super(Master,self).__init__();
signal.signal(signal.SIGTERM, self.handler); #注册信号处理函数
self.live = 1;
#信号处理函数
def handler(self, signum, frame):
print 'signal:',signum;
self.live = 0;
def run(self):
print 'PID:',self.pid;
while self.live:
print 'living...'
time.sleep(2);
方法很直观,首先在构造函数中注册信号处理函数,然后定义了一个方法handler作为处理函数。这个进程类会每隔2秒打印一个“living...”,当接收到SIGTERM后,改变self.live的值,run方法的循环检测到这个值为0后就结束了,进程也结束了。
2、让进程优雅的退出
下面放出这次的假想任务的全部代码,我在主进程中启动了一个子进程(通过子类化Process类),然后子进程启动后又产生两个子线程,用来模拟“生产者-消费者”模型,两个线程通过一个队列进行交流,为了互斥访问这个队列,自然要加一把锁(condition对象跟Lock对象差不多,不过多了等待和通知的功能);生产者每次产生一个随机数并扔进队列,然后休息一个随机时间,消费者每次从队列取一个数;而子进程中的主线程要负责接收信号,以便让整个过程优雅的结束。代码如下:
#!/usr/bin/python
#-*-coding:utf-8-*-
import time, multiprocessing, signal, threading, random, time, Queue
class Master(multiprocessing.Process):
def __init__(self):
super(Master,self).__init__();
signal.signal(signal.SIGTERM, self.handler);
#这个变量要传入线程用于控制线程运行,为什么用dict?充分利用线程间共享资源的特点
#因为可变对象按引用传递,标量是传值的,不信写成self.live = true试试
self.live = {'stat':True};
def handler(self, signum, frame):
print 'signal:',signum;
self.live['stat'] = 0; #置这个变量为0,通知子线程可以“收工”了
def run(self):
print 'PID:',self.pid;
cond = threading.Condition(threading.Lock()); #创建一个condition对象,用于子线程交互
q = Queue.Queue(); #一个队列
sender = Sender(cond, self.live, q); #传入共享资源
geter = Geter(cond, self.live, q);
sender.start(); #启动线程
geter.start();
signal.pause(); #主线程睡眠并等待信号
while threading.activeCount()-1: #主线程收到信号并被唤醒后,检查还有多少线程活着(除掉自己)
time.sleep(2); #再睡眠等待,确保子线程都安全的结束
print 'checking live', threading.activeCount();
print 'mater gone';
class Sender(threading.Thread):
def __init__(self, cond, live, queue):
super(Sender, self).__init__(name='sender');
self.cond = cond;
self.queue = queue;
self.live = live
def run(self):
cond = self.cond;
while self.live['stat']: #检查这个进程内的“全局”变量,为真就继续运行
cond.acquire(); #获得锁,以便控制队列
i = random.randint(0,100);
self.queue.put(i,False);
if not self.queue.full():
print 'sender add:',i;
cond.notify(); #唤醒等待锁的其他线程
cond.release(); #释放锁
time.sleep(random.randint(1,3));
print 'sender done'
class Geter(threading.Thread):
def __init__(self, cond, live, queue):
super(Geter, self).__init__(name='geter');
self.cond = cond;
self.queue = queue;
self.live = live
def run(self):
cond = self.cond;
while self.live['stat']:
cond.acquire();
if not self.queue.empty():
i = self.queue.get();
print 'geter get:',i;
cond.wait(3);
cond.release();
time.sleep(random.randint(1,3));
print 'geter done'
if __name__ == '__main__':
master = Master();
master.start(); #启动子进程
需要注意的地方是,在Master的run方法中sender.start()
和geter.start()
之后,按常理应该接着调用sender.join()
和geter.join()
,让主线程等待子线程结束,前面说的join的陷阱就在这里,join将主线程阻塞(blocking)住了,主线程无法再捕捉信号,刚开始研究这块时还以为信号处理函数写错了。网上讨论比较少,这里说的比较清楚http://stackoverflow.com/questions/631441/interruptible-thread-join-in-python,http://www.gossamer-threads.com/lists/python/python/541403
参考:
《python核心编程》
《python manual》
希望本文所述对大家Python程序设计有所帮助。
来源:https://blog.csdn.net/zhaozhi406/article/details/8137670
![](https://www.aspxhome.com/images/zang.png)
![](https://www.aspxhome.com/images/jiucuo.png)
猜你喜欢
- The WeekdayName function returns the weekday name of a specified day o
- 前言matplotlib画图例默认的位置是在图中的各个角落,但有时图例位置会遮挡住图像而不符合我们的需求,需要对图例位置进行调整。代码如下:
- scrapy是用python开发的爬虫框架,从网上查了安装方法,感觉都说的挺复杂,而且很多教程都很有年头了,于是记录了自己的安装过程。首先安
- Wake-On-LAN简称WOL,是一种电源管理功能;如果存在网络活动,则允许设备将操作系统从待机或休眠模式中唤醒。许多主板厂商支持IBM提
- python 里面与时间有关的模块主要是 time 和 datetime如果想获取系统当前时间戳:time.time(),是一个float型
- 相对于Firefox2来说,Firefox3除了采用全新的Gecko 1.9引擎外,在性能、稳定性和安全性方面进行许多改进,在我们最关心的对
- 可以使用Python的email模块来实现带有附件的邮件的发送。SMTP (Simple Mail Transfer Protocol)邮件
- 做电商时,消费者对商品的评论是很重要的,但是不会写代码怎么办?这里有个Chrome插件可以做到简单的数据爬取,一句代码都不用写。下面给大家展
- 都知道最近ChatGPT聊天机器人爆火,我也想方设法注册了账号,据说后面要收费了。ChatGPT是一种基于大语言模型的生成式AI,换句话说它
- Python 网页解析HTMLParse的实例详解使用python将网页抓取下来之后,下一步我们就应该解析网页,提取我们所需要的内容了,在p
- 前言正常图片转化成素描图片无非对图片像素的处理,矩阵变化而已。目前很多拍照修图App都有这一功能,核心代码不超30行。如下利用 Python
- 今天彬Go要向大家推荐9款很棒的可在网页中绘制图表的JavaScript脚本,这些有趣的JS脚本可以帮助你快速方便的绘制图表(线、面、饼、条
- 二值图像的凸壳指的是包围输入二值图像白色区域的最小的凸多边形的像素集合。skimage中的函数from skimage.morphology
- 本文实例讲述了Python3实现的判断回文链表算法。分享给大家供大家参考,具体如下:问题:请判断一个链表是否为回文链表。方案一:指针法cla
- 实例如下所示:>>> import pandas as pd>>> df = pd.DataFrame(
- Python3的 元组(Tuple)Python 的元组与列表类似,不同之处在于元组的元素不能修改。元组使用小括号 ( ),列表使用方括号
- 【人工智能项目】Python Flask搭建yolov3目标检测系统后端代码from flask import Flask, request
- 我看见朋友可以把数据库的记录输出到页面表格上去,觉得很有用。这是怎么做的啊?见下:dbtable.asp<html><he
- 这几天研究HUE,这个东西是基于Django开发的,于是又看了一眼Django,学着写个demo。微软开源的vscode在业界反应不错,以前
- 报错如下:TabError: inconsistent use of tabs and spaces in indentation我推荐一种