文章
async协程操作
工具类utils.py
import time
from functools import wraps
def async_timed(func):
@wraps(func)
async def wrapped(*args, **kwargs):
print(f"开始执行{func},参数为:{args},{kwargs}")
start = time.time()
try:
return await func(*args, **kwargs)
finally:
end = time.time()
total = end - start
print(f"结束执行{func},耗时:{total:.4f}秒")
return wrapped
test01.py
import asyncio
from async_demo.utils import async_timed
# 通过在函数前面加上async关键字,将一个普通的函数变为一个协程
@async_timed
async def main():
print("Hello")
# 协程必须要等待,也就是必须要在前面加上await关键字
await asyncio.sleep(1)
# 在协程中,对于那些会发生阻塞的I/O代码,一定不能使用同步的,否则程序就会阻塞在这个同步代码,失去并发性
print("World")
if __name__ == '__main__':
# 创建协程对象,main()并不是直接执行main函数,而是创建一个协程
cor = main()
# 需要把协程放到事件循环中才会执行
asyncio.run(cor)
test02.py
import asyncio
from utils import async_timed
async def greet(name, delay):
await asyncio.sleep(delay)
return f"Hello, {name}"
@async_timed
async def main():
# 并发运行,必须要将协程包装成Task对象,才能够并发执行
# Task对象是用于封装和管理协程运行的,可以将协程并发执行
# done() 用于获取该Task对象是否执行完成(正常完成、异常、被取消都算done)
# result() 用于获取该Task执行完成后的返回值
# exception() 如果Task对象执行过程中发生异常则该方法会返回异常信息。没发生异常则抛出Exception is not set
# 注意必须要把所有任务都创建好再分别await才能并发运行,不能创建任务后立即await这样会导致程序同步执行
task1 = asyncio.create_task(greet('xx', 2))
# create_task创建完任务后 这个任务会立马被加入到事件循环中进行调度了
task2 = asyncio.create_task(greet('yy', 3))
# 创建好的任务进行await
result1 = await task1
print(result1)
result2 = await task2
print(result2)
if __name__ == '__main__':
asyncio.run(main())
test03.py
import asyncio
from utils import async_timed
async def greet(name, delay):
await asyncio.sleep(delay)
if name == "xx":
raise ValueError("执行错误!")
return f"Hello, {name}"
@async_timed
async def main():
try:
async with asyncio.TaskGroup() as group:
# task对象
# done()用于获取该Task对象是否执行完成
# result()用于获取该Task执行完后的返回值
# exception()如果Task对象执行过程中发生异常,则该方法会返回异常信息
task1 = group.create_task(greet("xx", 1))
task2 = group.create_task(greet("yy", 2))
task3 = group.create_task(greet("zz", 3))
except Exception as e:
print(e)
# 所有任务都正常运行完了
# print(task1.result())
# print(task2.result())
# 其中有任务出现异常了 导致后面的任务被取消 从而提前退出协程的并发运行
print(task1.cancelled())
print(task2.done())
print(task3.done())
if __name__ == '__main__':
# TaskGroup中的任务,如果在运行期间有任何一个任务抛出异常,那么其他任务将全部取消。
asyncio.run(main())
test04.py
import asyncio
from utils import async_timed
async def greet(name, delay):
await asyncio.sleep(delay)
return f"Hello, {name}"
@async_timed
async def main():
# 与TaskGroup相比:asyncio.gather函数即使其中有任务抛出异常,也不会取消后面的任务
# 与as_completed相比:asyncio.gather会等待所有任务都执行完才会返回,而as_completed是执行完一个任务后就立即返回
results = await asyncio.gather(
greet("xx", 2),
greet("yy", 1),
greet("zz", 3),
)
tasks = asyncio.all_tasks()
for task in tasks:
print(task.get_name(), task.cancelled())
print(results)
if __name__ == '__main__':
asyncio.run(main())
test05.py
import asyncio
from utils import async_timed
async def greet(name, delay):
await asyncio.sleep(delay)
return f"Hello, {name}"
async def main():
aws = [
greet("xx", 2),
greet("yy", 1)
]
# 并发执行aws中的协程,并且可以通过遍历的形式进行等待 最先执行完先返回结果
# 其中某个任务抛出异常后,剩余的任务也不会被取消掉
# 可以指定超时时间,超过指定超时时间,还有任务没有完成,那么会抛出TimeoutError
for cor in asyncio.as_completed(aws):
result = await cor
print(result)
if __name__ == '__main__':
asyncio.run(main())
test06.py
import asyncio
from utils import async_timed
async def greet(name, delay):
await asyncio.sleep(delay)
return f"Hello, {name}"
@async_timed
async def main():
# wait_for指定一个Task运行超时时间
result = await asyncio.wait_for(greet("xx", 2), timeout=1)
print(result)
if __name__ == '__main__':
asyncio.run(main())
test07.py
import asyncio
from utils import async_timed
async def greet(name, delay):
await asyncio.sleep(delay)
return f"Hello, {name}"
@async_timed
async def main():
aws = [
asyncio.create_task(greet("xx", 1)),
asyncio.create_task(greet("yy", 3)),
]
# wait函数返回的结果是一个元组(执行完成的任务,执行超时的任务)
# return WHEN: ALL_COMPLETED
done_tasks, pending_tasks = await asyncio.wait(aws, timeout=2)
print(done_tasks)
print(pending_tasks)
for task in pending_tasks:
result = await task
print(result)
if __name__ == '__main__':
asyncio.run(main())
test08.py
import asyncio
from utils import async_timed
async def greet(name, delay):
await asyncio.sleep(delay)
return f"Hello, {name}"
@async_timed
async def main():
try:
# timeout函数返回一个异步上线文管理器 需要async with使用
# delay为具体秒数 None时代表永远不会超时
# 如果超过delay那么下面所有任务都将被取消,抛出TimeoutError异常
async with asyncio.timeout(2):
task1 = asyncio.create_task(greet("xx", 1), name="xx")
task2 = asyncio.create_task(greet("yy", 3), name="yy")
result1 = await task1
print(result1)
result2 = await task2
print(result2)
except asyncio.TimeoutError:
print("TimeoutError")
tasks = asyncio.all_tasks()
for task in tasks:
print(task.get_name())
if __name__ == '__main__':
asyncio.run(main())
test09.py
import asyncio
import time
from utils import async_timed
def blocking_function():
print('blocking开始')
time.sleep(2)
print('blocking结束')
return "success"
@async_timed
async def main():
print("main开始")
# 以下代码运行只需2秒,如果以下代码不使用to_thread,那么需要3秒,因为time.sleep(2)是阻塞的,会直接阻塞事件循环
results = await asyncio.gather(
asyncio.to_thread(blocking_function),
asyncio.sleep(1)
)
print(results)
print("main结束")
if __name__ == '__main__':
asyncio.run(main())
test10.py
import asyncio
from functools import partial
from utils import async_timed
async def greet(name, delay):
await asyncio.sleep(delay)
return f"Hello, {name}"
def my_callback(future, tag):
print("=" * 20)
print(type(future))
# 获取任务的返回值
print(future.result())
print("tag:", tag)
print("=" * 20)
@async_timed
async def main():
task = asyncio.create_task(greet("xx", 2))
task.add_done_callback(partial(my_callback, tag="yy"))
await task
if __name__ == '__main__':
asyncio.run(main())