大数据

async协程操作

工具类utils.py

import time
from functools import wraps


def async_timed(func):
    @wraps(func)
    async def wrapped(*args, **kwargs):
        print(f"开始执行{func},参数为:{args}{kwargs}")
        start = time.time()
        try:
            return await func(*args, **kwargs)
        finally:
            end = time.time()
            total = end - start
            print(f"结束执行{func},耗时:{total:.4f}秒")

    return wrapped

test01.py

import asyncio
from async_demo.utils import async_timed


# 通过在函数前面加上async关键字,将一个普通的函数变为一个协程
@async_timed
async def main():
    print("Hello")
    # 协程必须要等待,也就是必须要在前面加上await关键字
    await asyncio.sleep(1)
    # 在协程中,对于那些会发生阻塞的I/O代码,一定不能使用同步的,否则程序就会阻塞在这个同步代码,失去并发性
    print("World")


if __name__ == '__main__':
    # 创建协程对象,main()并不是直接执行main函数,而是创建一个协程
    cor = main()
    # 需要把协程放到事件循环中才会执行
    asyncio.run(cor)

test02.py

import asyncio

from utils import async_timed


async def greet(name, delay):
    await asyncio.sleep(delay)
    return f"Hello, {name}"


@async_timed
async def main():
    # 并发运行,必须要将协程包装成Task对象,才能够并发执行
    # Task对象是用于封装和管理协程运行的,可以将协程并发执行
    #   done() 用于获取该Task对象是否执行完成(正常完成、异常、被取消都算done)
    #   result() 用于获取该Task执行完成后的返回值
    #   exception() 如果Task对象执行过程中发生异常则该方法会返回异常信息。没发生异常则抛出Exception is not set
    # 注意必须要把所有任务都创建好再分别await才能并发运行,不能创建任务后立即await这样会导致程序同步执行
    task1 = asyncio.create_task(greet('xx', 2))
    # create_task创建完任务后 这个任务会立马被加入到事件循环中进行调度了
    task2 = asyncio.create_task(greet('yy', 3))

    # 创建好的任务进行await
    result1 = await task1
    print(result1)
    result2 = await task2
    print(result2)

if __name__ == '__main__':
    asyncio.run(main())

test03.py

import asyncio

from utils import async_timed


async def greet(name, delay):
    await asyncio.sleep(delay)
    if name == "xx":
        raise ValueError("执行错误!")
    return f"Hello, {name}"


@async_timed
async def main():
    try:
        async with asyncio.TaskGroup() as group:
            # task对象
            #     done()用于获取该Task对象是否执行完成
            #     result()用于获取该Task执行完后的返回值
            #     exception()如果Task对象执行过程中发生异常,则该方法会返回异常信息
            task1 = group.create_task(greet("xx", 1))
            task2 = group.create_task(greet("yy", 2))
            task3 = group.create_task(greet("zz", 3))
    except Exception as e:
        print(e)
    # 所有任务都正常运行完了
    # print(task1.result())
    # print(task2.result())

    # 其中有任务出现异常了 导致后面的任务被取消 从而提前退出协程的并发运行
    print(task1.cancelled())
    print(task2.done())
    print(task3.done())


if __name__ == '__main__':
    # TaskGroup中的任务,如果在运行期间有任何一个任务抛出异常,那么其他任务将全部取消。
    asyncio.run(main())

test04.py

import asyncio

from utils import async_timed


async def greet(name, delay):
    await asyncio.sleep(delay)
    return f"Hello, {name}"


@async_timed
async def main():
    # 与TaskGroup相比:asyncio.gather函数即使其中有任务抛出异常,也不会取消后面的任务
    # 与as_completed相比:asyncio.gather会等待所有任务都执行完才会返回,而as_completed是执行完一个任务后就立即返回
    results = await asyncio.gather(
        greet("xx", 2),
        greet("yy", 1),
        greet("zz", 3),
    )
    tasks = asyncio.all_tasks()
    for task in tasks:
        print(task.get_name(), task.cancelled())
    print(results)


if __name__ == '__main__':
    asyncio.run(main())

test05.py

import asyncio

from utils import async_timed


async def greet(name, delay):
    await asyncio.sleep(delay)
    return f"Hello, {name}"


async def main():
    aws = [
        greet("xx", 2),
        greet("yy", 1)
    ]
    # 并发执行aws中的协程,并且可以通过遍历的形式进行等待 最先执行完先返回结果
    # 其中某个任务抛出异常后,剩余的任务也不会被取消掉
    # 可以指定超时时间,超过指定超时时间,还有任务没有完成,那么会抛出TimeoutError
    for cor in asyncio.as_completed(aws):
        result = await cor
        print(result)


if __name__ == '__main__':
    asyncio.run(main())

test06.py

import asyncio

from utils import async_timed


async def greet(name, delay):
    await asyncio.sleep(delay)
    return f"Hello, {name}"


@async_timed
async def main():
    # wait_for指定一个Task运行超时时间
    result = await asyncio.wait_for(greet("xx", 2), timeout=1)
    print(result)


if __name__ == '__main__':
    asyncio.run(main())

test07.py

import asyncio

from utils import async_timed


async def greet(name, delay):
    await asyncio.sleep(delay)
    return f"Hello, {name}"


@async_timed
async def main():
    aws = [
        asyncio.create_task(greet("xx", 1)),
        asyncio.create_task(greet("yy", 3)),
    ]
    # wait函数返回的结果是一个元组(执行完成的任务,执行超时的任务)
    # return WHEN: ALL_COMPLETED
    done_tasks, pending_tasks = await asyncio.wait(aws, timeout=2)
    print(done_tasks)
    print(pending_tasks)
    for task in pending_tasks:
        result = await task
        print(result)


if __name__ == '__main__':
    asyncio.run(main())

test08.py

import asyncio

from utils import async_timed


async def greet(name, delay):
    await asyncio.sleep(delay)
    return f"Hello, {name}"


@async_timed
async def main():
    try:
        # timeout函数返回一个异步上线文管理器 需要async with使用
        # delay为具体秒数 None时代表永远不会超时
        # 如果超过delay那么下面所有任务都将被取消,抛出TimeoutError异常
        async with asyncio.timeout(2):
            task1 = asyncio.create_task(greet("xx", 1), name="xx")
            task2 = asyncio.create_task(greet("yy", 3), name="yy")

            result1 = await task1
            print(result1)
            result2 = await task2
            print(result2)
    except asyncio.TimeoutError:
        print("TimeoutError")
        tasks = asyncio.all_tasks()
        for task in tasks:
            print(task.get_name())


if __name__ == '__main__':
    asyncio.run(main())

test09.py

import asyncio
import time

from utils import async_timed


def blocking_function():
    print('blocking开始')
    time.sleep(2)
    print('blocking结束')
    return "success"


@async_timed
async def main():
    print("main开始")
    # 以下代码运行只需2秒,如果以下代码不使用to_thread,那么需要3秒,因为time.sleep(2)是阻塞的,会直接阻塞事件循环
    results = await asyncio.gather(
        asyncio.to_thread(blocking_function),
        asyncio.sleep(1)
    )
    print(results)
    print("main结束")


if __name__ == '__main__':
    asyncio.run(main())

test10.py

import asyncio
from functools import partial

from utils import async_timed


async def greet(name, delay):
    await asyncio.sleep(delay)
    return f"Hello, {name}"


def my_callback(future, tag):
    print("=" * 20)
    print(type(future))
    # 获取任务的返回值
    print(future.result())
    print("tag:", tag)
    print("=" * 20)


@async_timed
async def main():
    task = asyncio.create_task(greet("xx", 2))
    task.add_done_callback(partial(my_callback, tag="yy"))
    await task


if __name__ == '__main__':
    asyncio.run(main())