自学内容网 自学内容网

Python异步编程-asyncio详解

asyncio简介

示例

首先,我们来看一个简单的Hello World示例代码:

import asyncio

async def main():
    print('Hello ...')
    await asyncio.sleep(1)
    print('... World!')

asyncio.run(main())


""" 
输出:
Hello ...
... World!
"""

这个例子展示了如何使用 asyncio 库来编写并发代码。通过 asyncawait 语法,我们可以让 Python 程序在执行IO操作(如计算、文件读写、网络请求等)时实现在其他任务间高效切换,从而提升程序性能。

什么是 asyncio?

asyncio 是一个用于编写并发代码的Python库,允许我们利用 asyncawait 关键字进行异步编程。作为多个Python异步框架的基础,asyncio 提供了诸如高性能网络和Web服务器、数据库连接库以及分布式任务队列等功能。

适用场景

asyncio 非常适合用于 IO密集型和高层次结构化网络代码处理。其高效的异步IO处理方式,使其在需要大量网络通信和异步操作的场景中表现优异。

API

高级API用于:

  • 并发运行Python协程,完全控制它们的执行;
  • 执行网络IO和进程间通信(IPC);
  • 控制子进程;
  • 通过队列分发任务;
  • 同步并发代码。

低级API用于(库和框架开发人):

  • 创建和管理事件循环,提供异步API实现网络通信、运行子进程、处理OS信号等;
  • 使用 transports 实现高效率协议;
  • 用异步语法桥接基于回调的库和代码。

asyncio的使用

可等待对象

什么是可等待对象?

简单来说,可等待对象是可以在await表达式中使用的对象。它们可以暂停异步函数的执行,等待某个操作完成后再恢复执行。Python中的可等待对象主要包括三种类型:

  1. 协程对象(coroutine objects)
  2. 任务对象(tasks)
  3. Future对象(futures)

协程对象

协程函数是定义时使用async def语句的函数。当调用协程函数时,会返回一个协程对象。这些对象必须在事件循环中运行,可以直接被await

import asyncio

async def main():
    await asyncio.sleep(1)
    print("Hello, world!")

# 运行协程
asyncio.run(main())

在上面的示例中,main()是一个协程函数,调用它返回一个协程对象。asyncio.run(main())将运行事件循环并执行协程。

任务对象

任务对象是对协程对象的进一步封装,被用来“并行的”调度协程,它们会安排协程在事件循环中执行,并可以跟踪协程的状态和结果。可以通过asyncio.create_task函数创建任务,当一个协程通过 asyncio.create_task() 等函数被封装为一个任务,该协程会被自动调度执行。

import asyncio

async def say_hello():
    await asyncio.sleep(1)
    print("Hello!")

async def main():
    task = asyncio.create_task(say_hello())
    await task

asyncio.run(main())

在这个示例中,我们使用asyncio.create_task创建了一个任务对象,该对象随后被await,这意味着程序将等待任务完成。

Future对象

Future对象表示一个将来可能会有结果的操作,他们主要是用于低级别的异步编程。通常情况下,没有必要在应用层级的代码中创建 Future 对象。开发者更多使用高层次的抽象如任务对象,但了解Future对象仍然很有价值。

import asyncio

async def set_future_value(fut):
    await asyncio.sleep(1)
    fut.set_result("Finished!")

async def main():
    fut = asyncio.Future()
    await asyncio.create_task(set_future_value(fut))
    result = await fut
    print(result)

asyncio.run(main())

在这个示例中,通过asyncio.Future()创建了一个Future对象,并在一个协程中使用set_result方法设置了其结果。

协程

什么是协程?

协程(Coroutine)是一种比线程更轻量级的“并发”方式。它允许程序在同一个线程里“并行”地执行多个任务。这里“并行”并不是指真正的并行执行,而是协程可以在任务之间快速切换,从而让这些任务看起来像是同时进行的。

你可以把协程想象成一个大办公室里的一名员工,这名员工需要完成一些任务,比如接电话、发邮件、写报告。这些任务可能需要等一段时间才能完成,比如等电话的对方回复,等邮件发送成功,或者等等数据。但是在等待的时间里,这名员工不会闲着,他会继续去做别的任务。

  • 线程就像是一个员工每做一个任务他就需要一个独立的办公桌。线程是重量级的,需要更多资源,启动和管理也更复杂。
  • 协程就像是一个员工在同一个办公桌上同时处理多个任务,快速切换。协程是轻量级的,消耗的资源很少,启动和管理也比较简单。

基本使用

  • 定义协程函数:使用 async def 关键字定义一个协程函数。
async def my_coroutine():
    pass
  • 运行协程:可以使用 await 关键词等待另一个协程完成,或使用 asyncio.run() 来运行最顶层的入口点协程。
import asyncio


async def my_coroutine():
    print("这是使用await运行的协程")


async def main():
    print("使用asyncio.run运行协程开始")
    await my_coroutine()
    print("使用asyncio.run运行协程结束")

asyncio.run(main())


"""
输出:
使用asyncio.run运行协程开始
这是使用await运行的协程
使用asyncio.run运行协程结束
"""

注意:简单地调用一个协程并不会使其被调度执行

# python console 中运行
async def my_coroutine():
    print("my coroutine")
    
my_coroutine()
<coroutine object my_coroutine at 0x104519f50>

运行协程

  • 使用asyncio.run() 函数用来运行最顶层的入口点 “main()” 函数 (见上面的示例)

  • 使用await执行(以下代码段会在等待3秒后打印 "1号协程"结束时间,然后再次等待5秒后打印 "2号协程"完成时间)

    import asyncio
    import time
    
    
    async def my_coroutine(name, delay):
        await asyncio.sleep(delay)  # 模拟I/O操作
        print(f"{name}完成时间:{time.time()}")
    
    
    async def main():
        start_time = time.time()
        print(f"开始时间 {start_time}")
    
        await my_coroutine("1号协程", 3)
        await my_coroutine("2号协程", 5)
    
        end_time = time.time()
        print(f"结束时间 {end_time}")
        print(f"耗时{end_time - start_time:.2f}秒")
    
    
    asyncio.run(main())
    
    """
    输出:
    开始时间 1726210238.44529
    1号协程完成时间:1726210241.446931
    2号协程完成时间:1726210246.4494321
    结束时间 1726210246.449468
    耗时8.00秒
    """
    
  • asyncio.create_task() 函数用来并发运行作为 asyncio 任务的多个协程。(修改以上示例,并发运行两个协程)

    import asyncio
    import time
    
    
    async def my_coroutine(name, delay):
        await asyncio.sleep(delay)  # 模拟I/O操作
        print(f"{name}完成时间:{time.time()}")
    
    
    async def main():
        start_time = time.time()
        print(f"开始时间 {start_time}")
        task1 = asyncio.create_task(my_coroutine("1号协程", 3))
        task2 = asyncio.create_task(my_coroutine("2号协程", 5))
        await task1
        await task2
        end_time = time.time()
        print(f"结束时间 {end_time}")
        print(f"耗时{end_time - start_time:.2f}秒")
    
    
    asyncio.run(main())
    
    
    """
    输出:
    开始时间 1726210659.84198
    1号协程完成时间:1726210662.843559
    2号协程完成时间:1726210664.842549
    结束时间 1726210664.84267
    耗时5.00秒
    """
    # 可以明显看出总耗时比之前明显快了3秒,1和2之间的时间间隔也变成了2秒
    
  • asyncio.TaskGroup 类提供了 create_task() 的替代。 使用此 API,之前的例子可以改为

    async def main():
        start_time = time.time()
        print(f"开始时间 {start_time}")
    
        async with asyncio.TaskGroup() as tg:
            task1 = tg.create_task(my_coroutine("1号协程", 3))
            task2 = tg.create_task(my_coroutine("2号协程", 5))
    
        end_time = time.time()
        print(f"结束时间 {end_time}")
        print(f"耗时{end_time - start_time:.2f}秒")
    

Task

什么是 Task?

在 asyncio 中,Task 是对协程进行调度管理的对象。Task 实际上是 asyncio 事件循环的一个抽象概念,通过 Task 我们可以控制协程(coroutine)的执行,允许它们并发运行。在底层,Task 使用事件循环调度多个协程,使得它们似乎是同时运行的。

asyncio.Task 对象可以被看作是 Future 的一种特化,用于运行 Python 协程。它们被设计用来在事件循环中调度和运行协程。
Task对象是非线程安全的,意味着它们主要用于单线程的 asyncio 事件循环。

创建 Task

  • 使用 asyncio.create_task() 方法,
    asyncio.create_task() 是创建 Task 的最常见方法,它会立即调度协程的运行并返回一个 Task 对象:

    import asyncio
    import time
    
    
    async def my_coroutine():
        print(f"协程开始时间 {time.strftime('%X')}")
        await asyncio.sleep(2)
        print(f"协程结束时间 {time.strftime('%X')}")
    
    
    async def main():
        print(f"主协程开始时间:{time.strftime('%X')}")
    
        # 创建一个 Task
        task = asyncio.create_task(my_coroutine())
    
        print(f"任务创建时间:{time.strftime('%X')}")
    
        # 稍微等待一下,但不会 await Task
        await asyncio.sleep(5)
    
        print(f"延时结束,开始等待任务:{time.strftime('%X')}")
    
        # 现在等待 Task 完成
        await task
    
        print(f"主协程结束时间:{time.strftime('%X')}")
    
    
    # 运行主协程
    asyncio.run(main())
    
    """
    输出:
    主协程开始时间:17:45:30
    任务创建时间:17:45:30
    协程开始时间 17:45:30
    协程结束时间 17:45:32
    延时结束,开始等待任务:17:45:35
    主协程结束时间:17:45:35
    """
    

    在上面的示例中,asyncio.create_task(my_coroutine()) 创建了一个 Task,它会立即开始运行 my_coroutine 协程,即使我还并没有执行await。

  • 使用 loop.create_task() 方法

    我们还可以通过获取事件循环,然后调用它的 create_task 方法来创建任务:

    import asyncio
    import time
    
    
    async def my_coroutine():
        print(f"协程开始时间 {time.strftime('%X')}")
        await asyncio.sleep(2)
        print(f"协程结束时间 {time.strftime('%X')}")
    
    
    async def main():
        print(f"主协程开始时间:{time.strftime('%X')}")
    
        # 创建一个 Task
        loop = asyncio.get_running_loop()
        task = loop.create_task(my_coroutine())
    
        print(f"任务创建时间:{time.strftime('%X')}")
    
        # 稍微等待一下,但不会 await Task
        await asyncio.sleep(5)
    
        print(f"延时结束,开始等待任务:{time.strftime('%X')}")
    
        # 现在等待 Task 完成
        await task
    
        print(f"主协程结束时间:{time.strftime('%X')}")
    
    
    # 运行主协程
    asyncio.run(main())
    
    """
    输出:
    主协程开始时间:17:50:03
    任务创建时间:17:50:03
    协程开始时间 17:50:03
    协程结束时间 17:50:05
    延时结束,开始等待任务:17:50:08
    主协程结束时间:17:50:08
    """
    

    在上面的示例中,使用asyncio.get_running_loop() 获取当前正在运行的事件循环,loop.create_task(my_coroutine()) 使用事件循环的 create_task 方法创建并调度一个协程任务。
    asyncio.create_task()loop.create_task()的不同之处:

    • asyncio.create_task():是一个便捷方法,直接通过当前的默认事件循环创建任务
    • loop.create_task():需要明确提供事件循环,适用于更复杂或特定需求的场景,比如管理多个事件循环。
  • 使用 asyncio.ensure_future()
    虽然不如前两种方法常用,但 asyncio.ensure_future 也可以用来创建 Task。它可以接受协程或 Future 对象,并确保返回一个 Task:

    import asyncio
    import time
    
    
    async def my_coroutine():
        print(f"协程开始时间 {time.strftime('%X')}")
        await asyncio.sleep(2)
        print(f"协程结束时间 {time.strftime('%X')}")
    
    
    async def main():
        print(f"主协程开始时间:{time.strftime('%X')}")
    
        # 创建一个 Task
        task = asyncio.ensure_future(my_coroutine())
    
        print(f"任务创建时间:{time.strftime('%X')}")
    
        # 稍微等待一下,但不会 await Task
        await asyncio.sleep(5)
    
        print(f"延时结束,开始等待任务:{time.strftime('%X')}")
    
        # 现在等待 Task 完成
        await task
    
        print(f"主协程结束时间:{time.strftime('%X')}")
    
    
    # 运行主协程
    asyncio.run(main())
    
    """
    输出:
    主协程开始时间:18:00:54
    任务创建时间:18:00:54
    协程开始时间 18:00:54
    协程结束时间 18:00:56
    延时结束,开始等待任务:18:00:59
    主协程结束时间:18:00:59
    """
    

    asyncio.ensure_future() 是一个功能强大的函数,常用于将一个协程转换为一个 Future 对象。
    它在处理异步任务时提供了更多的灵活性,特别是在需要将协程包装为 Future 时。
    asyncio.create_task()asyncio.ensure_future() 的不同之处:

    • asyncio.create_task():专门用于将协程转换为 Task,只能处理协程对象。
    • asyncio.ensure_future():可以处理协程对象和 Future 对象,更加通用,适用于更多场景。

取消 Task

asyncio 中,当一个 Task 对象的 cancel() 方法被调用时,它会请求取消该任务。具体步骤如下:

  1. 标记任务为取消状态:调用 cancel() 方法后,任务会被标记为取消状态。
  2. 抛出 CancelledError 异常:再次调度这个任务时,它会在等待的位置抛出一个 asyncio.CancelledError 异常。
  3. 任务处理异常:协程内部可以捕获这个异常,进行相应的清理操作。
import asyncio


async def cancellable_task():
    try:
        print("Task 启动")
        await asyncio.sleep(10)  # 长时间任务
        print("Task 完成")
    except asyncio.CancelledError:
        print("Task 被取消")
        raise  # 重新抛出err,以便外部可以检测到任务已被取消


async def main():
    task = asyncio.create_task(cancellable_task())
    await asyncio.sleep(2)  # 等待一段时间
    task.cancel()  # 请求取消任务

    try:
        await task
    except asyncio.CancelledError:
        print("主协程: Task 被取消")


asyncio.run(main())

"""
输出:
Task 启动
Task 被取消
主协程: Task 被取消
"""

在这个例子中,main() 协程启动了一个长时间运行的任务 cancellable_task() 并在2秒后请求取消它。

Task 异常获取

asyncio 中,Task 对象继承了 Future 对象的许多方法和属性,其中包括 exception() 方法。exception() 用于获取任务在执行过程中抛出的异常。如果任务完成且没有异常发生,exception() 返回 None。如果任务还未完成,调用 exception() 将会引发 asyncio.InvalidStateError 异常。因此,通常我们需要在任务完成之后调用 exception() 方法。

import asyncio


async def my_task():
    await asyncio.sleep(1)
    raise ValueError("任务执行出错")


async def main():
    task = asyncio.create_task(my_task())

    try:
        await task
    except Exception as e:
        print(f"主协程中捕获异常: {e}")

    # 现在任务已经完成,可以检查异常
    if task.exception():
        print(f"任务结束通过exception方法检查异常: {task.exception()}")


asyncio.run(main())

"""
输出:
主协程中捕获异常: 任务执行出错
任务结束通过exception方法检查异常: 任务执行出错
"""

在这个示例中,my_task() 会抛出一个 ValueError 异常。我们在主协程 main() 中捕获该异常,同时也通过 exception() 方法再次获取并打印异常。

Task 回调

add_done_callback() 方法是 asyncio 提供的一个强大的工具,允许我们在任务完成后执行特定的回调函数。回调函数帮助我们更有效地管理任务的生命周期,处理结果和异常,并执行一些后续操作。

import asyncio


async def my_task():
    await asyncio.sleep(1)  # 模拟一些异步操作
    return "一键三连"


def task_done_callback(future):
    print(f"回调-任务已完成,结果: {future.result()}")


async def main():
    task = asyncio.create_task(my_task())
    task.add_done_callback(task_done_callback)
    await task  # 等待任务完成


asyncio.run(main())


"""
输出:
回调-任务已完成,结果: 一键三连
"""

在这个例子中,task_done_callback 回调函数会在 my_task 任务完成后被调用,并打印任务的结果。

import asyncio


async def failing_task():
    await asyncio.sleep(1)
    raise ValueError("任务出现了奇怪的错误")


def task_done_callback(future):
    if future.exception():
        print(f"回调-任务失败,异常: {future.exception()}")
    else:
        print(f"回调-任务完成,结果: {future.result()}")


async def main():
    task = asyncio.create_task(failing_task())
    task.add_done_callback(task_done_callback)
    try:
        await task
    except Exception as e:
        print(f"主协程捕获异常: {e}")


asyncio.run(main())


"""
输出:
回调-任务失败,异常: 任务出现了奇怪的错误
主协程捕获异常: 任务出现了奇怪的错误
"""

在这个示例中,当 failing_task 抛出异常时,task_done_callback 会检测并打印异常,而主协程也会捕获并处理该异常。

TaskGroup

什么是 TaskGroup?

TaskGroup 是 Python 3.11 中新增的 asyncio 组件。它提供了一种更简洁、更安全的方式来管理多个并发任务。TaskGroup 是一个上下文管理器,当与 async with 语句一起使用时,它允许我们在一个块内启动多个任务,并确保这些任务在上下文管理器退出时正确清理。

为什么使用 TaskGroup?

  1. 更简洁的语法:在没有 TaskGroup 之前,管理多个任务通常需要手动创建每个任务并在最后通过 await 语句等待所有任务完成。TaskGroup 简化了这一过程。
  2. 更好的错误处理:由于 TaskGroup 是一个上下文管理器,它更容易管理任务中的异常情况。
  3. 更清晰的结构:代码的可读性和结构性得到了显著提升。

创建任务

TaskGroup 中创建任务使用 create_task 方法。每个任务会立即调度,并在 TaskGroup 的管理范围内运行。

import asyncio
import time


async def task(n):
    print(f"任务 {n} 启动 {time.strftime('%X')}")
    await asyncio.sleep(2)
    print(f"任务 {n} 结束 {time.strftime('%X')}")


async def main():
    async with asyncio.TaskGroup() as tg:
        tg.create_task(task(1))
        tg.create_task(task(2))
        tg.create_task(task(3))


asyncio.run(main())


"""
输出:
任务 1 启动 17:08:18
任务 2 启动 17:08:18
任务 3 启动 17:08:18
任务 1 结束 17:08:20
任务 2 结束 17:08:20
任务 3 结束 17:08:20
"""

在这个示例中,我们通过 async with 语句创建了一个 TaskGroup,并使用 create_task 方法启动了三个并行运行的任务 task(n),这三个任务立即调度,并在 TaskGroup 的管理范围内运行。当所有任务完成时,TaskGroup 会自动进行清理。

异常处理

TaskGroup 中的任务引发异常时,异常会在退出 async with 块时处理。如果多个任务引发异常,TaskGroup 会聚合这些异常,并引发一个 ExceptionGroup 异常。

import asyncio


async def error_task():
    raise RuntimeError("一些奇怪的错误")


async def main():
    try:
        async with asyncio.TaskGroup() as tg:
            tg.create_task(error_task())
            tg.create_task(error_task())
            tg.create_task(error_task())
    except ExceptionGroup as e:
        print("任务组捕获异常: ", e)


asyncio.run(main())


"""
输出:
任务组捕获异常:  unhandled errors in a TaskGroup (3 sub-exceptions)
"""

在使用 asyncio.TaskGroup 时,如果多个任务引发异常,异常会被聚合成一个 ExceptionGroup 异常,并在 TaskGroup 上下文管理器退出时被捕获和处理。然而,默认情况下,ExceptionGroup 只提供较为简略的信息。要看到具体的子异常信息,我们需要更详细地打印 ExceptionGroup 对象。

import asyncio


async def error_task():
    raise RuntimeError("一些奇怪的错误")


async def main():
    try:
        async with asyncio.TaskGroup() as tg:
            tg.create_task(error_task())
            tg.create_task(error_task())
            tg.create_task(error_task())
    except ExceptionGroup as e:
        print("任务组捕获异常:")
        for sub_exception in e.exceptions:
            print(f"子异常: {sub_exception}")


asyncio.run(main())


"""
输出:
任务组捕获异常:
子异常: 一些奇怪的错误
子异常: 一些奇怪的错误
子异常: 一些奇怪的错误
"""

在上面的示例中,我们在捕获 ExceptionGroup 异常后,迭代其 exceptions 属性,逐个打印出子异常的信息。这样可以更全面了解 ExceptionGroup 中包含的所有异常。

同步任务完成

TaskGroup 保证所有任务在同一个上下文管理器范围内完成。如果某个任务需要较长时间完成,其他任务会等待它。

import asyncio


async def long_task():
    await asyncio.sleep(5)
    print("长任务完成")


async def short_task():
    await asyncio.sleep(1)
    print("短任务完成")


async def main():
    async with asyncio.TaskGroup() as tg:
        tg.create_task(long_task())
        tg.create_task(short_task())
    print("所有任务完成")


asyncio.run(main())

"""
输出:
短任务完成
长任务完成
所有任务完成
"""

上面的示例展示了如何使用 asyncio.TaskGroup 同时管理多个异步任务,其中短任务先完成并输出结果,长任务随后完成,最终确保所有任务结束后输出 “所有任务完成”。


原文地址:https://blog.csdn.net/youngwyj/article/details/142139046

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!