Python多线程与同步机制浅析
作者:alwaysrun 发布时间:2021-10-31 03:22:51
线程实现
Python中线程有两种方式:函数或者用类来包装线程对象。threading模块中包含了丰富的多线程支持功能:
threading.currentThread(): 返回当前线程;
threading.enumerate(): 返回包含正在运行的线程列表;
threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())等价。
Thread类
通过Thread类来处理线程,类中提供的一些方法:
run(): 用以表示线程执行的方法(可重载实现实际功能);
start(): 启动线程;
join([time]): 等待线程中止(或者超时);
isAlive(): 返回线程是否活动;
getName(): 返回线程名;
setName(): 设置线程名;
setDaemon(True):设置为后台进程(必须在start调用前设定)。
函数方式
通过Thread直接构造线程,然后通过start方法启动线程:
threading.Thread(group=None, target=None, name=None, args=(), kwargs=None, *,daemon=None)
各参数说明:
group:指定线程隶属的线程组(当前忽略);
target:指定线程要调度的目标方法(即实现功能的函数);
args:传递给目标方法的参数(以元组的方式);
kwargs:传递给目标方法的参数(以字典的方式);
daemon:指定线程是否为后台线程。
def simpleRoutine(name, delay):
print(f"routine {name} starting...")
time.sleep(delay)
print(f"routine {name} finished")
if __name__ == '__main__':
thrOne = threading.Thread(target=simpleRoutine, args=("First", 1))
thrTwo = threading.Thread(target=simpleRoutine, args=("Two", 2))
thrOne.start()
thrTwo.start()
thrOne.join()
thrTwo.join()
继承方式
直接继承Thread,创建一个新的子类(主要实现run方法):
class SimpleThread (threading.Thread):
def __init__(self, name, delay):
# threading.Thread.__init__(self)
super().__init__()
self.name = name
self.delay = delay
def run(self):
print(f"thread {self.name} starting...")
time.sleep(self.delay)
print(f"thread {self.name} finished")
if __name__ == '__main__':
thrOne = SimpleThread("First", 2)
thrTwo = SimpleThread("Second", 2)
thrOne.start()
thrTwo.start()
thrOne.join()
thrTwo.join()
同步机制
当多个线程同时修改同一条数据时可能会出现脏数据;所以,就需要线程锁,即同一时刻只允许一个线程执行操作。
同步锁Lock
threading提供了Lock和RLock(可重入锁)两个类,它们都提供了如下两个方法来加锁和释放锁:
acquire(blocking=True, timeout=-1):加锁,其中 timeout 参数指定加锁多少秒。
release():释放锁。
两种使用锁的方式:
gCount = 0
def PlusOne(locker):
global gCount
with locker:
gCount += 1、
def MinusOne(locker):
global gCount
if locker.acquire():
gCount -= 1
locker.release()
条件变量Condition
Condition对象内部维护了一个锁(构造时可传递一个Lock/RLock对象,否则内部会自行创建一个RLock)和一个waiting池:
通过acquire获得Condition对象;
当调用wait方法时,线程会释放Condition内部的锁并进入blocked状态,同时在waiting池中记录这个线程;
当调用notify方法时,Condition对象会从waiting池中挑选一个线程,通知其调用acquire方法尝试取到锁。
Condition对象:
__init__(self,lock=None)
:Condition类总是与一个锁相关联(若不指定lock参数,会自动创建一个与之绑定的RLock对象);
acquire(timeout)
:调用关联锁的acquire()方法;
release()
:调用关联锁的release()方法
wait(timeout)
:线程挂起,直到收到一个notify通知或超时才会被唤醒;必须在已获得锁的前提下调用;
notify(n=1)
:唤醒waiting池中的n个正在等待的线程并通知它:
收到通知的线程将自动调用acquire()方法尝试加锁;
若waiting池中有多个线程,随机选择n个唤醒;
必须在已获得锁的前提下调用,否则将引发错误。
notify_all()
:通知所有线程。
class Producer(threading.Thread):
def __init__(self, cond, storage):
threading.Thread.__init__(self)
self.cond = cond
self.storage = storage
def run(self):
label = 1
while True:
with self.cond:
if len(self.storage) < 10:
self.storage.append(label)
print(f"<- Produce {label} product")
label += 1
self.cond.notify(2)
else:
print(f"<- storage full: Has Produced {label - 1} product")
self.cond.notify_all()
self.cond.wait()
time.sleep(0.4)
class Consumer(threading.Thread):
def __init__(self, name, cond, storage):
threading.Thread.__init__(self)
self.name = name
self.cond = cond
self.storage = storage
def run(self):
while True:
if self.cond.acquire():
if len(self.storage) > 1:
pro = self.storage.pop(0)
print(f"-> {self.name} consumed {pro}")
self.cond.notify()
else:
print(f"-> {self.name} storage empty: no product to consume")
self.cond.wait()
self.cond.release()
time.sleep(1)
信号量Semaphore
信号量对象内部维护一个计数器:
acquire(blocking=True,timeout=None)
时减1,当计数为0就阻塞请求的线程;release()
时加1,当计数大于0恢复被阻塞的线程;
threading中有Semaphore和BoundedSemaphore两个信号量;BoundedSemaphore限制了release的次数,任何时候计数器的值,都不不能大于初始值(release时会检测计数器的值,若大于等于初始值,则抛出ValueError异常)。
通过Semaphore维护生产(release一个)、消费(acquire一个)量:
# products = threading.Semaphore(0)
def produceOne(label, sem: threading.Semaphore):
sem.release()
print(f"{label} produce one")
def consumeOne(label, sem: threading.Semaphore):
sem.acquire()
print(f"{label} consume one")
通过BoundedSemaphore来控制并发数量(最多有Semaphore初始值数量的线程并发):
# runner = threading.BoundedSemaphore(3)
def runBound(name, sem: threading.BoundedSemaphore):
with sem:
print(f"{name} is running")
time.sleep(1)
print(f"{name} finished")
事件Event
事件对象内部有个标志字段,用于线程等待事件的发生:
isSet():返回event的状态值;
wait():状态为False时,一直阻塞;否则立即返回;
set(): 设置状态值为True,激活所有被阻塞的线程;
clear():恢复状态值为False。
多线程等待事件发生,然后开始执行:
def waiters(name, evt: threading.Event):
evt.wait()
print(f"{name} is running")
time.sleep(1)
print(f"{name} finished")
def starting(evt: threading.Event):
evt.set()
print("event is set")
屏障Barrier
屏障用于设定等待线程数量,当数量达到指定值时,开始执行:
threading.Barrier(parties, action=None, timeout=None)
屏障属性与方法:
wait(timeout=None):等待通过屏障;线程被阻塞,直到阻塞的数量达到parties时,被阻塞的线程被同时全部释放;
reset():重置屏障到默认的空状态;
abort():将障碍置为断开状态;导致等待的线程引发BrokenBarrierError异常;
partier():通过障碍所需的线程数;
n_waiting():当前在屏障中等待的线程数;
broken():如果屏障处于断开状态,则返回True。
def waitBarrier(name, barr: threading.Barrier):
print(f"{name} waiting for open")
try:
barr.wait()
print(f"{name} running")
time.sleep(5)
except threading.BrokenBarrierError:
print(f"{name} exception")
print(f"{name} finished")
GIL全局解释器锁
GIL(Global Interpreter Lock,全局解释器锁);cpython中,某个线程想要执行,必须先拿到GIL(可以把GIL看作是“通行证”)。每次释放GIL锁,线程都要进行锁竞争,切换线程,会消耗资源。
由于GIL锁的存在,python里一个进程永远只能同时执行一个线程(拿到GIL的线程),这就是为什么在多核CPU上,python的多线程效率并不高:
CPU密集型代码:由于计算工作多,会很快用完时间片,然后触发GIL的释放与再竞争;
IO密集型代码(文件处理、网络爬虫等):多线程能够有效提升效率(单线程下有IO操作会进行IO等待,造成不必要的时间浪费,而开启多线程能在线程A等待时,自动切换到线程B,可以不浪费CPU的资源,从而能提升程序执行效率)。
python在使用多线程的时候,调用的是c语言的原生线程:
拿到公共数据
申请GIL
python解释器调用os原生线程
os操作cpu执行运算
当线程执行时间到后,就进行切换(context switch)
来源:https://blog.csdn.net/alwaysrun/article/details/127157924


猜你喜欢
- 本文实例讲述了Python中@property的理解和使用。分享给大家供大家参考,具体如下:重看狗书,看到对User表定义的时候有下面两行
- 在日常工作编程中存在着各种随机事件,同样在编程中生成随机数字的时候也是一样,随机有多随机呢?在涉及信息安全的情况下,它是最重要的问题之一。每
- 目录话不多说,直接贴所有代码运行效果需要用到的两张图片总结话不多说,直接贴所有代码import randomimport sysimport
- 最近因为要安装Tensorflow,然后发现tensorflow居然不支持python3.7,于是怒而将其降级到3.6以下是具体命令,mar
- 层及索引levels,刚开始学习pandas的时候没有太多的操作关于groupby,仅仅是简单的count、sum、size等等,没有更深入
- 时间库—arrow使用背景日期时间处理在实际应用场景中无处不在,所以这也成了编程语言中必不可少的模块,Python 也不例外。但是,你知道在
- close方法可以释放一个连接的资源,但是不是立即释放,如果想立即释放,那么在close之前使用shutdown方法shut_rd() --
- 前言django,web开发中,用django-debug-toolbar来调试请求的接口,无疑是完美至极。 可能本人,见识博浅,才说完美至
- 本文实例讲述了python开发中range()函数用法。分享给大家供大家参考,具体如下:python中的range()函数的功能很强大,所以
- 在数据处理与分析领域,对数值型与字符型类别变量加以编码是不可或缺的预处理操作;这里介绍两种不同的方法。1 OneHotEncoder首先导入
- 一、修改密码1.1 创建修改密码控制器运行命令php artisan make:controller Auth/PasswordContro
- 前言结构体是包含多个字段的集合类型,用于将数据组合为记录。这样可以将与同一实体相关联的数据利落地封装到一个轻量的类型定义中,然后通过对该结构
- 【OpenCV】⚠️高手勿入! 半小时学会基本操作⚠️色彩空间概述OpenCV 是一个跨平台的计算机视觉库, 支持多语言, 功能强大. 今天
- 近日,sql数据库入门学习群有朋友问到,利用sql如何删除表格的前1000行数据,是否可以实现?如果是oracle数据库管理软件,实现起来相
- 用df命令查了下,果然磁盘满了,因为当时分区采用系统默认,不知道为什么不能自动扩容!以后在处理这个问题!如图所示:[root@snsgou
- 以下为引用的内容: <html> <head> <title>不刷新页面查询的方法&
- 用Python基于Google Bard做一个交互式的聊天机器人之前已经通过浏览器试过了 Google Bard ,更多细节请看: Try
- 一直很想就搜索结果页写一些心得文章出来,甚至连目录都整理好了可是就是一直没有动手。因为总是觉得还差很多东西需要研究需要分析需要验证。最近也组
- 摘要: 有个C++项目是读取配置参数文件并打印对应的结果,后来需要多次修改配置文件并运行,于是想到写个python脚本执行这一过程。写一个测
- 在DreamWeaver中编写CSS,这种编写习惯并不提倡,不过由于"可视化"和操作简便,使用的朋友依然很多,今天罗列一