Python异步编程-asyncio详解
目录
asyncio简介
示例
首先,我们来看一个简单的Hello World示例代码:
import asyncio
async def main():
print('Hello ...')
await asyncio.sleep(1)
print('... World!')
asyncio.run(main())
"""
输出:
Hello ...
... World!
"""
这个例子展示了如何使用 asyncio
库来编写并发代码。通过 async
和 await
语法,我们可以让 Python 程序在执行IO操作(如计算、文件读写、网络请求等)时实现在其他任务间高效切换,从而提升程序性能。
什么是 asyncio?
asyncio
是一个用于编写并发代码的Python库,允许我们利用 async
和 await
关键字进行异步编程。作为多个Python异步框架的基础,asyncio
提供了诸如高性能网络和Web服务器、数据库连接库以及分布式任务队列等功能。
适用场景
asyncio
非常适合用于 IO密集型和高层次结构化网络代码处理。其高效的异步IO处理方式,使其在需要大量网络通信和异步操作的场景中表现优异。
API
高级API用于:
- 并发运行Python协程,完全控制它们的执行;
- 执行网络IO和进程间通信(IPC);
- 控制子进程;
- 通过队列分发任务;
- 同步并发代码。
低级API用于(库和框架开发人):
- 创建和管理事件循环,提供异步API实现网络通信、运行子进程、处理OS信号等;
- 使用 transports 实现高效率协议;
- 用异步语法桥接基于回调的库和代码。
asyncio的使用
可等待对象
什么是可等待对象?
简单来说,可等待对象是可以在await
表达式中使用的对象。它们可以暂停异步函数的执行,等待某个操作完成后再恢复执行。Python中的可等待对象主要包括三种类型:
- 协程对象(coroutine objects)
- 任务对象(tasks)
- Future对象(futures)
协程对象
协程函数是定义时使用async def
语句的函数。当调用协程函数时,会返回一个协程对象。这些对象必须在事件循环中运行,可以直接被await
。
import asyncio
async def main():
await asyncio.sleep(1)
print("Hello, world!")
# 运行协程
asyncio.run(main())
在上面的示例中,main()
是一个协程函数,调用它返回一个协程对象。asyncio.run(main())
将运行事件循环并执行协程。
任务对象
任务对象是对协程对象的进一步封装,被用来“并行的”调度协程,它们会安排协程在事件循环中执行,并可以跟踪协程的状态和结果。可以通过asyncio.create_task
函数创建任务,当一个协程通过 asyncio.create_task() 等函数被封装为一个任务,该协程会被自动调度执行。
import asyncio
async def say_hello():
await asyncio.sleep(1)
print("Hello!")
async def main():
task = asyncio.create_task(say_hello())
await task
asyncio.run(main())
在这个示例中,我们使用asyncio.create_task
创建了一个任务对象,该对象随后被await
,这意味着程序将等待任务完成。
Future对象
Future对象表示一个将来可能会有结果的操作,他们主要是用于低级别的异步编程。通常情况下,没有必要在应用层级的代码中创建 Future 对象。开发者更多使用高层次的抽象如任务对象,但了解Future对象仍然很有价值。
import asyncio
async def set_future_value(fut):
await asyncio.sleep(1)
fut.set_result("Finished!")
async def main():
fut = asyncio.Future()
await asyncio.create_task(set_future_value(fut))
result = await fut
print(result)
asyncio.run(main())
在这个示例中,通过asyncio.Future()
创建了一个Future对象,并在一个协程中使用set_result
方法设置了其结果。
协程
什么是协程?
协程(Coroutine)是一种比线程更轻量级的“并发”方式。它允许程序在同一个线程里“并行”地执行多个任务。这里“并行”并不是指真正的并行执行,而是协程可以在任务之间快速切换,从而让这些任务看起来像是同时进行的。
你可以把协程想象成一个大办公室里的一名员工,这名员工需要完成一些任务,比如接电话、发邮件、写报告。这些任务可能需要等一段时间才能完成,比如等电话的对方回复,等邮件发送成功,或者等等数据。但是在等待的时间里,这名员工不会闲着,他会继续去做别的任务。
- 线程就像是一个员工每做一个任务他就需要一个独立的办公桌。线程是重量级的,需要更多资源,启动和管理也更复杂。
- 协程就像是一个员工在同一个办公桌上同时处理多个任务,快速切换。协程是轻量级的,消耗的资源很少,启动和管理也比较简单。
基本使用
- 定义协程函数:使用 async def 关键字定义一个协程函数。
async def my_coroutine():
pass
- 运行协程:可以使用 await 关键词等待另一个协程完成,或使用 asyncio.run() 来运行最顶层的入口点协程。
import asyncio
async def my_coroutine():
print("这是使用await运行的协程")
async def main():
print("使用asyncio.run运行协程开始")
await my_coroutine()
print("使用asyncio.run运行协程结束")
asyncio.run(main())
"""
输出:
使用asyncio.run运行协程开始
这是使用await运行的协程
使用asyncio.run运行协程结束
"""
注意:简单地调用一个协程并不会使其被调度执行
# python console 中运行
async def my_coroutine():
print("my coroutine")
my_coroutine()
<coroutine object my_coroutine at 0x104519f50>
运行协程
-
使用asyncio.run() 函数用来运行最顶层的入口点 “main()” 函数 (见上面的示例)
-
使用await执行(以下代码段会在等待3秒后打印 "1号协程"结束时间,然后再次等待5秒后打印 "2号协程"完成时间)
import asyncio import time async def my_coroutine(name, delay): await asyncio.sleep(delay) # 模拟I/O操作 print(f"{name}完成时间:{time.time()}") async def main(): start_time = time.time() print(f"开始时间 {start_time}") await my_coroutine("1号协程", 3) await my_coroutine("2号协程", 5) end_time = time.time() print(f"结束时间 {end_time}") print(f"耗时{end_time - start_time:.2f}秒") asyncio.run(main()) """ 输出: 开始时间 1726210238.44529 1号协程完成时间:1726210241.446931 2号协程完成时间:1726210246.4494321 结束时间 1726210246.449468 耗时8.00秒 """
-
asyncio.create_task() 函数用来并发运行作为 asyncio 任务的多个协程。(修改以上示例,并发运行两个协程)
import asyncio import time async def my_coroutine(name, delay): await asyncio.sleep(delay) # 模拟I/O操作 print(f"{name}完成时间:{time.time()}") async def main(): start_time = time.time() print(f"开始时间 {start_time}") task1 = asyncio.create_task(my_coroutine("1号协程", 3)) task2 = asyncio.create_task(my_coroutine("2号协程", 5)) await task1 await task2 end_time = time.time() print(f"结束时间 {end_time}") print(f"耗时{end_time - start_time:.2f}秒") asyncio.run(main()) """ 输出: 开始时间 1726210659.84198 1号协程完成时间:1726210662.843559 2号协程完成时间:1726210664.842549 结束时间 1726210664.84267 耗时5.00秒 """ # 可以明显看出总耗时比之前明显快了3秒,1和2之间的时间间隔也变成了2秒
-
asyncio.TaskGroup 类提供了 create_task() 的替代。 使用此 API,之前的例子可以改为
async def main(): start_time = time.time() print(f"开始时间 {start_time}") async with asyncio.TaskGroup() as tg: task1 = tg.create_task(my_coroutine("1号协程", 3)) task2 = tg.create_task(my_coroutine("2号协程", 5)) end_time = time.time() print(f"结束时间 {end_time}") print(f"耗时{end_time - start_time:.2f}秒")
Task
什么是 Task?
在 asyncio 中,Task 是对协程进行调度管理的对象。Task 实际上是 asyncio 事件循环的一个抽象概念,通过 Task 我们可以控制协程(coroutine)的执行,允许它们并发运行。在底层,Task 使用事件循环调度多个协程,使得它们似乎是同时运行的。
asyncio.Task
对象可以被看作是 Future
的一种特化,用于运行 Python 协程。它们被设计用来在事件循环中调度和运行协程。
Task对象是非线程安全的,意味着它们主要用于单线程的 asyncio 事件循环。
创建 Task
-
使用
asyncio.create_task()
方法,
asyncio.create_task()
是创建 Task 的最常见方法,它会立即调度协程的运行并返回一个 Task 对象:import asyncio import time async def my_coroutine(): print(f"协程开始时间 {time.strftime('%X')}") await asyncio.sleep(2) print(f"协程结束时间 {time.strftime('%X')}") async def main(): print(f"主协程开始时间:{time.strftime('%X')}") # 创建一个 Task task = asyncio.create_task(my_coroutine()) print(f"任务创建时间:{time.strftime('%X')}") # 稍微等待一下,但不会 await Task await asyncio.sleep(5) print(f"延时结束,开始等待任务:{time.strftime('%X')}") # 现在等待 Task 完成 await task print(f"主协程结束时间:{time.strftime('%X')}") # 运行主协程 asyncio.run(main()) """ 输出: 主协程开始时间:17:45:30 任务创建时间:17:45:30 协程开始时间 17:45:30 协程结束时间 17:45:32 延时结束,开始等待任务:17:45:35 主协程结束时间:17:45:35 """
在上面的示例中,
asyncio.create_task(my_coroutine())
创建了一个 Task,它会立即开始运行my_coroutine
协程,即使我还并没有执行await。 -
使用
loop.create_task()
方法我们还可以通过获取事件循环,然后调用它的
create_task
方法来创建任务:import asyncio import time async def my_coroutine(): print(f"协程开始时间 {time.strftime('%X')}") await asyncio.sleep(2) print(f"协程结束时间 {time.strftime('%X')}") async def main(): print(f"主协程开始时间:{time.strftime('%X')}") # 创建一个 Task loop = asyncio.get_running_loop() task = loop.create_task(my_coroutine()) print(f"任务创建时间:{time.strftime('%X')}") # 稍微等待一下,但不会 await Task await asyncio.sleep(5) print(f"延时结束,开始等待任务:{time.strftime('%X')}") # 现在等待 Task 完成 await task print(f"主协程结束时间:{time.strftime('%X')}") # 运行主协程 asyncio.run(main()) """ 输出: 主协程开始时间:17:50:03 任务创建时间:17:50:03 协程开始时间 17:50:03 协程结束时间 17:50:05 延时结束,开始等待任务:17:50:08 主协程结束时间:17:50:08 """
在上面的示例中,使用
asyncio.get_running_loop()
获取当前正在运行的事件循环,loop.create_task(my_coroutine())
使用事件循环的create_task
方法创建并调度一个协程任务。
asyncio.create_task()
与loop.create_task()
的不同之处:asyncio.create_task()
:是一个便捷方法,直接通过当前的默认事件循环创建任务loop.create_task()
:需要明确提供事件循环,适用于更复杂或特定需求的场景,比如管理多个事件循环。
-
使用
asyncio.ensure_future()
虽然不如前两种方法常用,但asyncio.ensure_future
也可以用来创建 Task。它可以接受协程或 Future 对象,并确保返回一个 Task:import asyncio import time async def my_coroutine(): print(f"协程开始时间 {time.strftime('%X')}") await asyncio.sleep(2) print(f"协程结束时间 {time.strftime('%X')}") async def main(): print(f"主协程开始时间:{time.strftime('%X')}") # 创建一个 Task task = asyncio.ensure_future(my_coroutine()) print(f"任务创建时间:{time.strftime('%X')}") # 稍微等待一下,但不会 await Task await asyncio.sleep(5) print(f"延时结束,开始等待任务:{time.strftime('%X')}") # 现在等待 Task 完成 await task print(f"主协程结束时间:{time.strftime('%X')}") # 运行主协程 asyncio.run(main()) """ 输出: 主协程开始时间:18:00:54 任务创建时间:18:00:54 协程开始时间 18:00:54 协程结束时间 18:00:56 延时结束,开始等待任务:18:00:59 主协程结束时间:18:00:59 """
asyncio.ensure_future()
是一个功能强大的函数,常用于将一个协程转换为一个 Future 对象。
它在处理异步任务时提供了更多的灵活性,特别是在需要将协程包装为 Future 时。
asyncio.create_task()
和asyncio.ensure_future()
的不同之处:asyncio.create_task()
:专门用于将协程转换为 Task,只能处理协程对象。asyncio.ensure_future()
:可以处理协程对象和 Future 对象,更加通用,适用于更多场景。
取消 Task
在 asyncio
中,当一个 Task
对象的 cancel()
方法被调用时,它会请求取消该任务。具体步骤如下:
- 标记任务为取消状态:调用
cancel()
方法后,任务会被标记为取消状态。 - 抛出
CancelledError
异常:再次调度这个任务时,它会在等待的位置抛出一个asyncio.CancelledError
异常。 - 任务处理异常:协程内部可以捕获这个异常,进行相应的清理操作。
import asyncio
async def cancellable_task():
try:
print("Task 启动")
await asyncio.sleep(10) # 长时间任务
print("Task 完成")
except asyncio.CancelledError:
print("Task 被取消")
raise # 重新抛出err,以便外部可以检测到任务已被取消
async def main():
task = asyncio.create_task(cancellable_task())
await asyncio.sleep(2) # 等待一段时间
task.cancel() # 请求取消任务
try:
await task
except asyncio.CancelledError:
print("主协程: Task 被取消")
asyncio.run(main())
"""
输出:
Task 启动
Task 被取消
主协程: Task 被取消
"""
在这个例子中,main()
协程启动了一个长时间运行的任务 cancellable_task()
并在2秒后请求取消它。
Task 异常获取
在 asyncio
中,Task
对象继承了 Future
对象的许多方法和属性,其中包括 exception()
方法。exception()
用于获取任务在执行过程中抛出的异常。如果任务完成且没有异常发生,exception()
返回 None
。如果任务还未完成,调用 exception()
将会引发 asyncio.InvalidStateError
异常。因此,通常我们需要在任务完成之后调用 exception()
方法。
import asyncio
async def my_task():
await asyncio.sleep(1)
raise ValueError("任务执行出错")
async def main():
task = asyncio.create_task(my_task())
try:
await task
except Exception as e:
print(f"主协程中捕获异常: {e}")
# 现在任务已经完成,可以检查异常
if task.exception():
print(f"任务结束通过exception方法检查异常: {task.exception()}")
asyncio.run(main())
"""
输出:
主协程中捕获异常: 任务执行出错
任务结束通过exception方法检查异常: 任务执行出错
"""
在这个示例中,my_task()
会抛出一个 ValueError
异常。我们在主协程 main()
中捕获该异常,同时也通过 exception()
方法再次获取并打印异常。
Task 回调
add_done_callback()
方法是 asyncio
提供的一个强大的工具,允许我们在任务完成后执行特定的回调函数。回调函数帮助我们更有效地管理任务的生命周期,处理结果和异常,并执行一些后续操作。
import asyncio
async def my_task():
await asyncio.sleep(1) # 模拟一些异步操作
return "一键三连"
def task_done_callback(future):
print(f"回调-任务已完成,结果: {future.result()}")
async def main():
task = asyncio.create_task(my_task())
task.add_done_callback(task_done_callback)
await task # 等待任务完成
asyncio.run(main())
"""
输出:
回调-任务已完成,结果: 一键三连
"""
在这个例子中,task_done_callback
回调函数会在 my_task
任务完成后被调用,并打印任务的结果。
import asyncio
async def failing_task():
await asyncio.sleep(1)
raise ValueError("任务出现了奇怪的错误")
def task_done_callback(future):
if future.exception():
print(f"回调-任务失败,异常: {future.exception()}")
else:
print(f"回调-任务完成,结果: {future.result()}")
async def main():
task = asyncio.create_task(failing_task())
task.add_done_callback(task_done_callback)
try:
await task
except Exception as e:
print(f"主协程捕获异常: {e}")
asyncio.run(main())
"""
输出:
回调-任务失败,异常: 任务出现了奇怪的错误
主协程捕获异常: 任务出现了奇怪的错误
"""
在这个示例中,当 failing_task
抛出异常时,task_done_callback
会检测并打印异常,而主协程也会捕获并处理该异常。
TaskGroup
什么是 TaskGroup?
TaskGroup
是 Python 3.11 中新增的 asyncio
组件。它提供了一种更简洁、更安全的方式来管理多个并发任务。TaskGroup
是一个上下文管理器,当与 async with
语句一起使用时,它允许我们在一个块内启动多个任务,并确保这些任务在上下文管理器退出时正确清理。
为什么使用 TaskGroup?
- 更简洁的语法:在没有
TaskGroup
之前,管理多个任务通常需要手动创建每个任务并在最后通过await
语句等待所有任务完成。TaskGroup
简化了这一过程。 - 更好的错误处理:由于
TaskGroup
是一个上下文管理器,它更容易管理任务中的异常情况。 - 更清晰的结构:代码的可读性和结构性得到了显著提升。
创建任务
在 TaskGroup
中创建任务使用 create_task
方法。每个任务会立即调度,并在 TaskGroup
的管理范围内运行。
import asyncio
import time
async def task(n):
print(f"任务 {n} 启动 {time.strftime('%X')}")
await asyncio.sleep(2)
print(f"任务 {n} 结束 {time.strftime('%X')}")
async def main():
async with asyncio.TaskGroup() as tg:
tg.create_task(task(1))
tg.create_task(task(2))
tg.create_task(task(3))
asyncio.run(main())
"""
输出:
任务 1 启动 17:08:18
任务 2 启动 17:08:18
任务 3 启动 17:08:18
任务 1 结束 17:08:20
任务 2 结束 17:08:20
任务 3 结束 17:08:20
"""
在这个示例中,我们通过 async with
语句创建了一个 TaskGroup
,并使用 create_task
方法启动了三个并行运行的任务 task(n)
,这三个任务立即调度,并在 TaskGroup
的管理范围内运行。当所有任务完成时,TaskGroup
会自动进行清理。
异常处理
当 TaskGroup
中的任务引发异常时,异常会在退出 async with
块时处理。如果多个任务引发异常,TaskGroup
会聚合这些异常,并引发一个 ExceptionGroup
异常。
import asyncio
async def error_task():
raise RuntimeError("一些奇怪的错误")
async def main():
try:
async with asyncio.TaskGroup() as tg:
tg.create_task(error_task())
tg.create_task(error_task())
tg.create_task(error_task())
except ExceptionGroup as e:
print("任务组捕获异常: ", e)
asyncio.run(main())
"""
输出:
任务组捕获异常: unhandled errors in a TaskGroup (3 sub-exceptions)
"""
在使用 asyncio.TaskGroup
时,如果多个任务引发异常,异常会被聚合成一个 ExceptionGroup
异常,并在 TaskGroup
上下文管理器退出时被捕获和处理。然而,默认情况下,ExceptionGroup
只提供较为简略的信息。要看到具体的子异常信息,我们需要更详细地打印 ExceptionGroup
对象。
import asyncio
async def error_task():
raise RuntimeError("一些奇怪的错误")
async def main():
try:
async with asyncio.TaskGroup() as tg:
tg.create_task(error_task())
tg.create_task(error_task())
tg.create_task(error_task())
except ExceptionGroup as e:
print("任务组捕获异常:")
for sub_exception in e.exceptions:
print(f"子异常: {sub_exception}")
asyncio.run(main())
"""
输出:
任务组捕获异常:
子异常: 一些奇怪的错误
子异常: 一些奇怪的错误
子异常: 一些奇怪的错误
"""
在上面的示例中,我们在捕获 ExceptionGroup
异常后,迭代其 exceptions
属性,逐个打印出子异常的信息。这样可以更全面了解 ExceptionGroup
中包含的所有异常。
同步任务完成
TaskGroup
保证所有任务在同一个上下文管理器范围内完成。如果某个任务需要较长时间完成,其他任务会等待它。
import asyncio
async def long_task():
await asyncio.sleep(5)
print("长任务完成")
async def short_task():
await asyncio.sleep(1)
print("短任务完成")
async def main():
async with asyncio.TaskGroup() as tg:
tg.create_task(long_task())
tg.create_task(short_task())
print("所有任务完成")
asyncio.run(main())
"""
输出:
短任务完成
长任务完成
所有任务完成
"""
上面的示例展示了如何使用 asyncio.TaskGroup
同时管理多个异步任务,其中短任务先完成并输出结果,长任务随后完成,最终确保所有任务结束后输出 “所有任务完成”。
原文地址:https://blog.csdn.net/youngwyj/article/details/142139046
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!