python使用threading.Condition交替打印两个字符
作者:tinyid 发布时间:2022-04-13 09:01:46
Python中使用threading.Condition交替打印两个字符的程序。
这个程序涉及到两个线程的的协调问题,两个线程为了能够相互协调运行,必须持有一个共同的状态,通过这个状态来维护两个线程的执行,通过使用threading.Condition对象就能够完成两个线程之间的这种协调工作。
threading.Condition默认情况下会通过持有一个ReentrantLock来协调线程之间的工作,所谓可重入锁,是只一个可以由一个线程递归获取的锁,此锁对象会维护当前锁的所有者(线程)和当前所有者递归获取锁的次数(本文在逻辑上和可重入锁没有任何关系,完全可以用一个普通锁替代)。
Python文档中给出的描述是:它是一个与某个锁相联系的变量。同时它实现了上下文管理协议。其对象中除了acquire和release方法之外,其它方法的调用的前提是,当前线程必须是这个锁的所有者。
通过代码和其中的注释,能够非常明白地弄清楚Condition的原理是怎样的:
import threading
import time
import functools
def worker(cond, name):
"""worker running in different thread"""
with cond: # 通过__enter__方法,获取cond对象中的锁,默认是一个ReentrantLock对象
print('...{}-{}-{}'.format(name, threading.current_thread().getName(), cond._is_owned()))
cond.wait() # 创建一个新的锁NEWLOCK,调用acquire将NEWLOCK获取,然后将NEWLOCK放入等待列表中,\
# 释放cond._lock锁(_release_save),最后再次调用acquire让NEWLOCK阻塞
print('wait returned in {}'.format(name))
if __name__ == '__main__':
condition = threading.Condition()
t1 = threading.Thread(target=functools.partial(worker, condition, 't1'))
t2 = threading.Thread(target=functools.partial(worker, condition, 't2'))
t2.start() # 启动线程2
t1.start() # 启动线程1
time.sleep(2)
with condition:
condition.notify(1) # 按照FIFO顺序(wait调用顺序),释放一个锁,并将其从等待列表中删除
time.sleep(2)
with condition:
condition.notify(1) # 按照FIFO顺序(wait调用顺序),释放另一个锁,并将其从等待队列中删除
t1.join() # 主线程等待子线程结束
t2.join() # 主线程等待子线程结束
print('All done')
其输出为:
...t2-Thread-2-True
...t1-Thread-1-True
wait returned in t2
wait returned in t1
All done
其中wait方法要求获取到threading.Condition对象中的锁(如果没有提供,默认使用一个可重入锁),然后自己创建一个新的普通锁(NEWLOCK),并获取这个NEWLOCK;之后调用_release_save方法释放threading.Condition对象中的锁,让其它线程能够获取到;最后再次调用NEWLOCK上的acquire方法,由于在创建时已经acquire过,所以此线程会阻塞在此。而wait想要继续执行,必须等待其它线程将产生阻塞的这个NEWLOCK给release掉,当然,这就是notify方法的责任了。
notify方法接收一个数字n,从等待列表中取出相应数量的等待对象(让wait方法阻塞的锁对象),调用其release方法,让对应的wait方法能够返回。而notify_all方法仅仅就是将n设置为等待列表的总长度而已。
在理解了threading.Condition对象中wait和notify的工作原理之后,我们就可以利用它们来实现两个线程交替打印字符的功能了:
import threading
import functools
import time
def print_a(state):
while True:
if state.closed:
print('Close a')
return
print('A')
time.sleep(2)
state.set_current_is_a(True)
state.wait_for_b()
def print_b(state):
while True:
if state.closed:
print('Close b')
return
state.wait_for_a()
print('B')
time.sleep(2)
state.set_current_is_a(False)
if __name__ == '__main__':
class State(object):
"""state used to coordinate multiple(two here) threads"""
def __init__(self):
self.condition = threading.Condition()
self.current_is_a = False
self.closed = False
def wait_for_a(self):
with self.condition:
while not self.current_is_a:
self.condition.wait()
def wait_for_b(self):
with self.condition:
while self.current_is_a:
self.condition.wait()
def set_current_is_a(self, flag):
self.current_is_a = flag
with self.condition:
self.condition.notify_all()
state = State()
t1 = threading.Thread(target=functools.partial(print_a, state))
t2 = threading.Thread(target=functools.partial(print_b, state))
try:
t1.start()
t2.start()
t1.join()
t2.join()
except KeyboardInterrupt:
state.closed = True
print('Closed')
可以看到有两种类型的任务,一个用于打印字符A,一个用于打印字符B,我们的实现种让A先于B打印,所以在print_a中,先打印A,再设置当前字符状态并释放等待列表中的所有锁(set_current_is_a),如果没有这一步,current_is_a将一直是False,wait_for_b能够返回,而wait_for_a却永远不会返回,最终效果就是每隔两秒就打印一个字符A,而B永远不会打印。另一个副作用是如果wait_for_a永远不会返回,那print_b所在线程的关闭逻辑也就无法执行,最终会成为僵尸线程(这里的关闭逻辑只用作示例,生产环境需要更加完善的关闭机制)。
考虑另一种情况,print_a种将set_current_is_a和wait_for_b交换一下位置会怎么样。从观察到的输出我们看到,程序首先输出了一个字符A,以后,每隔2秒钟,就会同时输出A和B,而不是交替输出。原因在于,由于current_is_a还是False,我们先调用的wait_for_b其会立即返回,之后调用set_current_is_a,将current_is_a设置为True,并释放所有的阻塞wait的锁(notify_all),这个过程中没有阻塞,print_a紧接着进入了下一个打印循环;与此同时,print_b中的wait_for_a也返回了,进入到B的打印循环,故最终我们看到A和B总是一起打印。
可见对于threading.Condition的使用需要多加小心,要注意逻辑上的严谨性。
附一个队列版本:
import threading
import functools
import time
from queue import Queue
def print_a(q_a, q_b):
while True:
char_a = q_a.get()
if char_a == 'closed':
return
print(char_a)
time.sleep(2)
q_b.put('B')
def print_b(q_a, q_b):
while True:
char_b = q_b.get()
if char_b == 'closed':
return
print(char_b)
time.sleep(2)
q_a.put('A')
if __name__ == '__main__':
q_a = Queue()
q_b = Queue()
t1 = threading.Thread(target=functools.partial(print_a, q_a, q_b))
t2 = threading.Thread(target=functools.partial(print_b, q_a, q_b))
try:
t1.start()
t2.start()
q_a.put('A')
t1.join()
t2.join()
except KeyboardInterrupt:
q_a.put('closed')
q_b.put('closed')
print('Done')
队列版本逻辑更清晰,更不容易出错,实际应用中应该选用队列。
附一个协程版本(Python 3.5+):
import time
import asyncio
async def print_a():
while True:
print('a')
time.sleep(2) # simulate the CPU block time
await asyncio.sleep(0) # release control to event loop
async def print_b():
while True:
print('b')
time.sleep(2) # simulate the CPU block time
await asyncio.sleep(0) # release control to event loop
async def main():
await asyncio.wait([print_a(), print_b()])
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
协程的运行需要依附于一个事件循环(select/poll/epoll/kqueue),通过async def将一个函数定义为协程,通过await主动让渡控制权,通过相互让渡控制权完成交替打印字符。整个程序运行于一个线程中,这样就没有线程间协调的工作,仅仅是控制权的让渡逻辑。对于IO密集型操作,而没有明显的CPU阻塞(计算复杂,以致出现明显的延时,比如复杂加解密算法)的情况下非常合适。
附一个Java版本:
PrintMain类,用于管理和协调打印A和打印B的两个线程:
package com.cuttyfox.tests.self.version1;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class PrintMain {
private boolean currentIsA = false;
public synchronized void waitingForPrintingA() throws InterruptedException {
while (this.currentIsA == false) {
wait();
}
}
public synchronized void waitingForPrintingB() throws InterruptedException {
while (this.currentIsA == true) {
wait();
}
}
public synchronized void setCurrentIsA(boolean flag) {
this.currentIsA = flag;
notifyAll();
}
public static void main(String[] args) throws Exception {
PrintMain state = new PrintMain();
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(new PrintB(state));
executorService.execute(new PrintA(state));
executorService.shutdown();
executorService.awaitTermination(10, TimeUnit.SECONDS);
System.out.println("Done");
System.exit(0);
}
}
打印A的线程(首先打印A):
package com.cuttyfox.tests.self.version1;
import java.util.concurrent.TimeUnit;
public class PrintA implements Runnable{
private PrintMain state;
public PrintA(PrintMain state) {
this.state = state;
}
public void run() {
try {
while (!Thread.interrupted()){
System.out.println("Print A");
TimeUnit.SECONDS.sleep(1);
this.state.setCurrentIsA(true);
this.state.waitingForPrintingB();
}
} catch (InterruptedException e) {
System.out.println("Exit through Interrupting.");
}
}
}
打印B的线程:
package com.cuttyfox.tests.self.version1;
import java.util.concurrent.TimeUnit;
public class PrintB implements Runnable{
private PrintMain state;
public PrintB(PrintMain state) {
this.state = state;
}
public void run() {
try{
while (!Thread.interrupted()) {
this.state.waitingForPrintingA();
System.out.println("Print B");
TimeUnit.SECONDS.sleep(1);
this.state.setCurrentIsA(false);
}
} catch (InterruptedException e) {
System.out.println("Exit through Interrupting.");
}
}
}
Java对象本身有对象锁,故这里没有像Python中那样需要显式通过创建一个Condition对象来得到一把锁。
使用Python实现交替打印abcdef的过程:
import threading
import time
import functools
from collections import deque
LETTERS = [chr(code) for code in range(97, 97+6)]
LENGTH = len(LETTERS)
class State(object):
def __init__(self):
self.condition = threading.Condition()
self.index_value = 0
def set_next_index(self, index):
with self.condition:
self.index_value = index
self.condition.notify_all()
def wait_for(self, index_value):
with self.condition:
while not self.index_value == index_value:
self.condition.wait()
def print_letter(state: State, wait_ident: int):
print('Got: {}!'.format(wait_ident))
while True:
state.wait_for(wait_ident)
time.sleep(2)
print(LETTERS[state.index_value])
print('PRINT: {} AND SET NEXT: {}'.format(state.index_value,
(state.index_value + 1) % LENGTH
))
state.set_next_index((state.index_value + 1) % LENGTH)
state = State()
d = deque()
d.extend(range(LENGTH))
d.rotate(1)
print(d)
threads = []
for wait_ident in d:
t = threading.Thread(target=functools.partial(print_letter, state, wait_ident))
threads.append(t)
for thread in threads:
thread.start()
for thread in threads:
thread.join()
来源:https://blog.csdn.net/cnweike/article/details/80939225
猜你喜欢
- 回调函数回调函数是一个对象(实现了特定方法的类实例),它在调用fit()时被传入模型,并在训练过程中的不同时间点被模型调用可以访问关于模型状
- asp使用fso对象遍历目录及目录下的文件代码:<%@ Language=VBScript %><%&
- 前几天,Opera宣布其用户已经超过1亿——桌面版和手机版均超过5000万。Opera Mini是一个很优秀的手机浏览器,对手机用户而言,O
- 引言https://github.com/go-chassis/go-chassis是一个微服务开发框架,而微服务开发框架带来的其中一个课题
- 本文实例讲述了Django发送html邮件的方法。分享给大家供大家参考。具体如下:在Django中,发送邮件非常的方便,一直没有时间,今天来
- 1.命名规范1.库名、表名、字段名必须使用小写字母,并采用下划线分割。a)MySQL有配置参数lower_case_table_names,
- Python 字符串格式化使用 "字符 %格式1 %格式2 字符"%(变量1,变量2),%格式表示接受变量的类型。简单的
- 不知道大家在做页面的时候会不会遇到样式定义不生效的问题,基本的表现就是怎么改样式都没显示或只有某些浏览器正常,这时通常需要做下面的几步:确认
- 一 方法汇总在 Python 进程中,有几种方法可以实现数据交互:共享内存:这是一种用于进程间通信的高效方式。多个进程可以访问同一个共享内存
- <%@LANGUAGE="VBSCRIPT" CODEPAGE="936"%>
- 使用非常简单,小伙伴们只要修改对应的参数即可,这里就不多废话了,直接奉上实例吧。<div class="txt1"
- 前言数组类型是各种编程语言中基本的数组结构了,本文来盘点下Python中各种“数组”类型的实现。listtuplearray.arrayst
- 纵观各大编程语言在 2017 年的发展情况,我们会发现涌现出诸如 Go、Swift 这类后起之秀,而其中最为耀眼的当属 Python。之所以
- 栅格就是你对页面版式的规划你日常所见的许多页面都有栅格存在。你可能注意不到,但它确实存在,并且支撑着设计内容,建立整体的架构,引导着页面的元
- 介绍我们可以通过for循环来迭代list、tuple、dict、set、字符串,dict比较特殊dict的存储不是连续的,所以迭代(遍历)出
- 创建索引:MySql创建索引的语法如下:CREATE [UNIQUE|FULLTEXT|SPATIAL] INDEX index_name
- 定时将源文件或目录使用WinRAR压缩并自动备份到本地或网络上的主机1.确保WinRAR安装在默认路径或者把WinRAR.exe添加到环境变
- 前言对MySQL有研究的读者,可能会发现MySQL更新很快,在安装方式上,MySQL提供了两种经典安装方式:解压式和一键式,虽然是两种安装方
- 本文实例讲述了python通过pil为png图片填充上背景颜色的方法。分享给大家供大家参考。具体分析如下:png图片有些是没有背景颜色,如果
- 一、标签语法由%}和 {% 来定义的,例如:{%tag%} {%endtag%},完整的标签有开始就有结束,如条件语句,有条件判断的开始,也