python协程与 asyncio 库详情
作者:梦想橡皮擦 发布时间:2023-08-23 12:08:36
前言:
python 中协程概念是从 3.4 版本增加的,但 3.4 版本采用是生成器实现,为了将协程和生成器的使用场景进行区分,使语义更加明确,在 python 3.5 中增加了 async
和 await
关键字,用于定义原生协程。
1.asyncio 异步 I/O 库
python 中的 asyncio 库提供了管理事件、协程、任务和线程的方法,以及编写并发代码的原语,即 async
和 await
。
该模块的主要内容:
事件循环:event_loop,管理所有的事件,是一个无限循环方法,在循环过程中追踪事件发生的顺序将它们放在队列中,空闲时则调用相应的事件处理者来处理这些事件;
协程:
coroutine
,子程序的泛化概念,协程可以在执行期间暂停,等待外部的处理(I/O 操作)完成之后,再从暂停的地方继续运行,函数定义式使用async
关键字,这样这个函数就不会立即执行,而是返回一个协程对象;Future
和Task
:Future
对象表示尚未完成的计算,Task
是Future
的子类,包含了任务的各个状态,作用是在运行某个任务的同时可以并发的运行多个任务。
异步函数的定义
异步函数本质上依旧是函数,只是在执行过程中会将执行权交给其它协程,与普通函数定义的区别是在 def
关键字前增加 async
。
# 异步函数
import asyncio
# 异步函数
async def func(x):
print("异步函数")
return x ** 2
ret = func(2)
print(ret)
运行代码输入如下内容:
sys:1: RuntimeWarning: coroutine 'func' was never awaited
<coroutine object func at 0x0000000002C8C248>
函数返回一个协程对象,如果想要函数得到执行,需要将其放到事件循环 event_loop
中。
事件循环 event_loop
event_loop
是 asyncio
模块的核心,它将异步函数注册到事件循环上。 过程实现方式为:由 loop
在适当的时候调用协程,这里使用的方式名为 asyncio.get_event_loop()
,然后由 run_until_complete(协程对象)
将协程注册到事件循环中,并启动事件循环。
import asyncio
# 异步函数
async def func(x):
print("异步函数")
return x ** 2
# 协程对象,该对象不能直接运行
coroutine1 = func(2)
# 事件循环对象
loop = asyncio.get_event_loop()
# 将协程对象加入到事件循环中,并执行
ret = loop.run_until_complete(coroutine1)
print(ret)
首先在 python 3.7 之前的版本中使用异步函数是安装上述流程:
先通过
asyncio.get_event_loop()
获取事件循环loop
对象;然后通过不同的策略调用
loop.run_until_complete()
或者loop.run_forever()
执行异步函数。
在 python 3.7 之后的版本,直接使用 asyncio.run()
即可,该函数总是会创建一个新的事件循环并在结束时进行关闭。
最新的官方文档 都采用的是run
方法。 官方案例
import asyncio
async def main():
print('hello')
await asyncio.sleep(1)
print('world')
asyncio.run(main())
接下来在查看一个完整的案例,并且结合await
关键字。
import asyncio
import time
# 异步函数1
async def task1(x):
print("任务1")
await asyncio.sleep(2)
print("恢复任务1")
return x
# 异步函数2
async def task2(x):
print("任务2")
await asyncio.sleep(1)
print("恢复任务2")
return x
async def main():
start_time = time.perf_counter()
ret_1 = await task1(1)
ret_2 = await task2(2)
print("任务1 返回的值是", ret_1)
print("任务2 返回的值是", ret_2)
print("运行时间", time.perf_counter() - start_time)
if __name__ == '__main__':
# 创建一个事件循环
loop = asyncio.get_event_loop()
# 将协程对象加入到事件循环中,并执行
loop.run_until_complete(main())
代码输出如下所示:
任务1
恢复任务1
任务2
恢复任务2
任务1 返回的值是 1
任务2 返回的值是 2
运行时间 2.99929154
上述代码创建了 3 个协程,其中 task1
和 task2
都放在了协程函数 main
中,I/O 操作通过 asyncio.sleep(1)
进行模拟,整个函数运行时间为 2.9999 秒,接近 3 秒,依旧是串行进行,如果希望修改为并发执行,将代码按照下述进行修改。
import asyncio
import time
# 异步函数1
async def task1(x):
print("任务1")
await asyncio.sleep(2)
print("恢复任务1")
return x
# 异步函数2
async def task2(x):
print("任务2")
await asyncio.sleep(1)
print("恢复任务2")
return x
async def main():
start_time = time.perf_counter()
ret_1,ret_2 = await asyncio.gather(task1(1),task2(2))
print("任务1 返回的值是", ret_1)
print("任务2 返回的值是", ret_2)
print("运行时间", time.perf_counter() - start_time)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
上述代码最大的变化是将task1
和task2
放到了asyncio.gather()
中运行,此时代码输出时间明显变短。
任务1
任务2
恢复任务2 # 任务2 由于等待时间短,先返回。
恢复任务1
任务1 返回的值是 1
任务2 返回的值是 2
运行时间 2.0005669480000003
asyncio.gather()
可以更换为asyncio.wait()
,修改代码如下所示:
import asyncio
import time
# 异步函数1
async def task1(x):
print("任务1")
await asyncio.sleep(2)
print("恢复任务1")
return x
# 异步函数2
async def task2(x):
print("任务2")
await asyncio.sleep(1)
print("恢复任务2")
return x
async def main():
start_time = time.perf_counter()
done, pending = await asyncio.wait([task1(1), task2(2)])
print(done)
print(pending)
print("运行时间", time.perf_counter() - start_time)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
asyncio.wait()
返回一个元组,其中包含一个已经完成的任务集合,一个未完成任务的集合。
gather 和 wait 的区别:
gather
:需要所有任务都执行结束,如果任意一个协程函数崩溃了,都会抛异常,不会返回结果;wait
:可以定义函数返回的时机,可以设置为FIRST_COMPLETED
(第一个结束的),FIRST_EXCEPTION
(第一个出现异常的),ALL_COMPLETED
(全部执行完,默认的)。
done,pending = await asyncio.wait([task1(1),task2(2)],return_when=asyncio.tasks.FIRST_EXCEPTION)
创建 task
由于协程对象不能直接运行,在注册到事件循环时,是run_until_complete
方法将其包装成一个 task
对象。该对象是对coroutine
对象的进一步封装,它比coroutine
对象多了运行状态,例如 pending
,running
,finished
,可以利用这些状态获取协程对象的执行情况。
下面显示的将coroutine
对象封装成task
对象,在上述代码基础上进行修改。
import asyncio
import time
# 异步函数1
async def task1(x):
print("任务1")
await asyncio.sleep(2)
print("恢复任务1")
return x
# 异步函数2
async def task2(x):
print("任务2")
await asyncio.sleep(1)
print("恢复任务2")
return x
async def main():
start_time = time.perf_counter()
# 封装 task 对象
coroutine1 = task1(1)
task_1 = loop.create_task(coroutine1)
coroutine2 = task2(2)
task_2 = loop.create_task(coroutine2)
ret_1, ret_2 = await asyncio.gather(task_1, task_2)
print("任务1 返回的值是", ret_1)
print("任务2 返回的值是", ret_2)
print("运行时间", time.perf_counter() - start_time)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
由于task
对象是future
对象的子类对象,所以上述代码也可以按照下述内容修改:
# task_2 = loop.create_task(coroutine2)
task_2 = asyncio.ensure_future(coroutine2)
下面将task
对象的各个状态进行打印输出。
import asyncio
import time
# 异步函数1
async def task1(x):
print("任务1")
await asyncio.sleep(2)
print("恢复任务1")
return x
# 异步函数2
async def task2(x):
print("任务2")
await asyncio.sleep(1)
print("恢复任务2")
return x
async def main():
start_time = time.perf_counter()
# 封装 task 对象
coroutine1 = task1(1)
task_1 = loop.create_task(coroutine1)
coroutine2 = task2(2)
# task_2 = loop.create_task(coroutine2)
task_2 = asyncio.ensure_future(coroutine2)
# 进入 pending 状态
print(task_1)
print(task_2)
# 获取任务的完成状态
print(task_1.done(), task_2.done())
# 执行任务
await task_1
await task_2
# 再次获取完成状态
print(task_1.done(), task_2.done())
# 获取返回结果
print(task_1.result())
print(task_2.result())
print("运行时间", time.perf_counter() - start_time)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
await task_1
表示的是执行该协程,执行结束之后,task.done()
返回 True
,task.result()
获取返回值。
回调返回值
当协程执行完毕,需要获取其返回值,刚才已经演示了一种办法,使用 task.result()
方法获取,但是该方法仅当协程运行完毕时,才能获取结果,如果协程没有运行完毕,result()
方法会返回 asyncio.InvalidStateError
(无效状态错误)。
一般编码都采用第二种方案,通过add_done_callback()
方法绑定回调。
import asyncio
import requests
async def request_html():
url = 'https://www.csdn.net'
res = requests.get(url)
return res.status_code
def callback(task):
print('回调:', task.result())
loop = asyncio.get_event_loop()
coroutine = request_html()
task = loop.create_task(coroutine)
# 绑定回调
task.add_done_callback(callback)
print(task)
print("*"*100)
loop.run_until_complete(task)
print(task)
上述代码当coroutine
执行完毕时,会调用callback
函数。
如果回调函数需要多个参数,请使用functools
模块中的偏函数(partial
)方法
循环事件关闭
建议每次编码结束之后,都调用循环事件对象close()
方法,彻底清理loop
对象。
2.本节爬虫项目
本节课要采集的站点由于全部都是 coser 图片,所以地址在代码中查看即可。
完整代码如下所示:
import threading
import asyncio
import time
import requests
import lxml
from bs4 import BeautifulSoup
async def get(url):
return requests.get(url)
async def get_html(url):
print("准备抓取:", url)
res = await get(url)
return res.text
async def save_img(img_url):
# thumbMid_5ae3e05fd3945 将小图替换为大图
img_url = img_url.replace('thumb','thumbMid')
img_url = "http://mycoser.com/" + img_url
print("图片下载中:", img_url)
res = await get(img_url)
if res is not None:
with open(f'./imgs/{time.time()}.jpg', 'wb') as f:
f.write(res.content)
return img_url,"ok"
async def main(url_list):
# 创建 5 个任务
tasks = [asyncio.ensure_future(get_html(url_list[_])) for _ in range(len(url_list))]
dones, pending = await asyncio.wait(tasks)
for task in dones:
html = task.result()
soup = BeautifulSoup(html, 'lxml')
divimg_tags = soup.find_all(attrs={'class': 'workimage'})
for div in divimg_tags:
ret = await save_img(div.a.img["data-original"])
print(ret)
if __name__ == '__main__':
urls = [f"http://mycoser.com/picture/lists/p/{page}" for page in range(1, 17)]
totle_page = len(urls) // 5 if len(urls) % 5 == 0 else len(urls) // 5 + 1
# 对 urls 列表进行切片,方便采集
for page in range(0, totle_page):
start_page = 0 if page == 0 else page * 5
end_page = (page + 1) * 5
# 循环事件对象
loop = asyncio.get_event_loop()
loop.run_until_complete(main(urls[start_page:end_page]))
代码说明:上述代码中第一个要注意的是await
关键字后面只能跟如下内容:
原生的协程对象;
一个包含
await
方法的对象返回的一个迭代器。
所以上述代码get_html
函数中嵌套了一个协程 get
。主函数 main
里面为了运算方便,直接对 urls 进行了切片,然后通过循环进行运行。
当然上述代码的最后两行,可以直接修改为:
# 循环事件对象
# loop = asyncio.get_event_loop()
#
# loop.run_until_complete(main(urls[start_page:end_page]))
asyncio.run(main(urls[start_page:end_page]))
轻松获取一堆高清图片:
来源:https://juejin.cn/post/7077795582311399438
猜你喜欢
- 晚上突然间看到大猫的头像在闪动,速度打开一看,发现他问,以前我写button标签的时候有没有写type属性,老实的我只有诚实地告诉他,我没写
- 什么是deferdefer用来声明一个延迟函数,把这个函数放入到一个栈上, 当外部的包含方法return之前,返回参数到调用方法之前调用,也
- 在cssplay网站看到有一组CSS像素画,于是也想摩仿一下,于是在网络上找到一组头像图标,看其结构比较简单,就拿它开刀吧!先看看预览图图一
- 相关文章推荐:各种loading加载图标下载 gif格式loadinfo和ajaxload一样,也是一个在线Ajax载入动画生成工
- asp抓取网页。偶要实现实实更新天气预报。利用了XMLHTTP组件,抓取网页的指定部分。很多小偷查询都是使用这个方法来实现的。需要分件htm
- 今天好不容易闲下来半天,所以和大家分享一下我之前总结的一套Web UI 设计命名规范,也就是网站用户界面设计(俗称网页设计)命名规范。这套规
- 负责为网页编程语言提供标准化服务的W3C组织(World Wide Web Consortium)近日开始修改超文本标记语言的定义,计划为该
- 阅读前:Pro Javascript Techniques翻译连载说明和目录JavaScript的演化是渐进而稳固的。历经过去十年的进程,J
- 一、准备工作:1.建立一个 WINDOWS 用户,设置为管理员权限,并设置密码,作为发布快照文件的有效访问用户。2.在SQL SERVER下
- getattr函数(1)使用 getattr 函数,可以得到一个直到运行时才知道名称的函数的引用。>>> li = [&q
- 为了防止网络上日益猖獗的垃圾广告和灌水评论,大多数网站在信息发布的时候要求输入验证码。图片、文字、字母甚至还有计算题。验证码图片里的信息东颠
- 选择正确的数据列类型能大大提高数据库的性能和使数据库具有高扩展性。在选择数据列类型时,请从以下几个方面考虑:存放到数据列中的数据类型。数据值
- 大家都知道连续的英文或数字能是容器被撑大,不能根据容器的大小自动换行,网页设计初学者可能不知道怎么处理,下面是CSS如何将他们换行的方法!对
- 一:操作redis1:redis拓展安装composer require predis/predis或者你也可以通过 PECL 安装&nbs
- <% '测试读取MySql数据库的内容strconnection="driver={mysql odbc 3.51 dri
- 北京邮电大学 张剑XML的局限性目前,许多Web网站的内容数据都存放在数据库或数据文件中。对于Web程序开发人员来说,如果要想把有用的信息从
- 如何用METADATA替换ADOVBS.INC? 在ASP中,使用组件时,如ADO,得先包含
- Oracle9i之前,中文是按照二进制编码进行排序的。在oracle9i中新增了按照拼音、部首、笔画排序功能。 1、
- Access SQL 函数 收藏 ▲日期/时间CDate 将字符串转化成为日期 select CDate("2005/
- Oracle的逻辑运算符也是用在SQL语句中必不可少的因素,一共有三个逻辑运算符意义and双值运算符,如果左右两个条件都为真,则得到的值就为