一步一步从初级到高级的介绍一下asyncio 的用法
asyncio
是 Python 用来处理异步编程的库,简单说就是让程序在“等东西”的时候不傻乎乎干等着,而是去干点别的活儿,提高效率。
初级:啥是异步?怎么跑起来?
1. 理解异步的核心:别干等
想象你在煮饭:水开了要等 10 分钟才能下面条。同步编程就像你傻站那儿盯着锅等 10 分钟啥也不干;异步编程呢,就是水烧着的时候你去刷个碗、切个菜,等水开了再回来下面条。asyncio 就是帮你实现这种“不干等”的工具。
2. 最简单的起步:跑个 Hello World
Python 自带 asyncio,不用装啥,直接开干。咱先写个最简单的异步代码:
import asyncio
async def say_hello():
print("Hello")
await asyncio.sleep(1) # 假装等1秒,就像烧水
print("World")
asyncio.run(say_hello())
- async def:定义一个异步函数,告诉 Python 这函数可以“暂停”。
- await:遇到 await 就先暂停,等它后面的东西(比如 sleep)干完再继续。
- asyncio.run():启动异步程序的入口。
跑一下,输出是:
Hello
(等1秒)
World
这就是异步的基本味儿:程序不会卡死,能“暂停”再“继续”。
3. 多干点活儿:跑多个任务
光一个任务没啥意思,异步的牛逼之处是能同时干多个事。比如:
import asyncio
async def task1():
print("任务1开始")
await asyncio.sleep(2)
print("任务1结束")
async def task2():
print("任务2开始")
await asyncio.sleep(1)
print("任务2结束")
async def main():
await asyncio.gather(task1(), task2()) # 一起跑
asyncio.run(main())
输出:
任务1开始
任务2开始
任务2结束 # 1秒后
任务1结束 # 2秒后
asyncio.gather():把多个任务塞一块儿跑,类似于“烧水的时候切菜”。
任务1等2秒,任务2等1秒,但它们是一起开始的,总时间只花了2秒,而不是3秒(2+1)。
中级:整点实用的活儿
1. 加点交互:模拟网络请求
现实中,异步最常用在等网络请求(比如爬网页、调接口)。咱用 sleep 模拟一下延迟:
import asyncio
async def fetch_data(name, delay):
print(f"{name} 开始请求数据")
await asyncio.sleep(delay) # 模拟网络延迟
print(f"{name} 数据拿到啦")
return f"{name} 的结果"
async def main():
results = await asyncio.gather(
fetch_data("网页A", 2),
fetch_data("网页B", 1),
fetch_data("网页C", 3)
)
print("所有结果:", results)
asyncio.run(main())
输出:
网页A 开始请求数据
网页B 开始请求数据
网页C 开始请求数据
网页B 数据拿到啦 # 1秒
网页A 数据拿到啦 # 2秒
网页C 数据拿到啦 # 3秒
所有结果: ['网页A 的结果', '网页B 的结果', '网页C 的结果']
这里模拟了三个“请求”,每个耗时不同,但总耗时是3秒(最长的那个),而不是6秒(2+1+3)。
2. 加个超时:别等太久
有时候网络太慢,咱得加个“等不及就放弃”的逻辑:
import asyncio
async def slow_task():
await asyncio.sleep(5)
return "终于好了"
async def main():
try:
result = await asyncio.wait_for(slow_task(), timeout=2) # 只等2秒
print(result)
except asyncio.TimeoutError:
print("等不及了,超时了!")
asyncio.run(main())
输出:
等不及了,超时了!
asyncio.wait_for():设置超时,超过时间就抛异常。
高级:整复杂点的活儿
1. 任务调度:手动控制谁先跑
有时候你不想全扔一块儿跑,而是手动控制任务。可以用 asyncio.create_task():
import asyncio
async def task1():
await asyncio.sleep(1)
print("任务1完成")
async def task2():
await asyncio.sleep(2)
print("任务2完成")
async def main():
t1 = asyncio.create_task(task1()) # 创建任务,马上跑
t2 = asyncio.create_task(task2())
print("任务都安排上了")
await t1 # 等任务1完成
print("任务1好了,继续干别的")
await t2 # 再等任务2
asyncio.run(main())
输出:
任务都安排上了
任务1完成 # 1秒
任务1好了,继续干别的
任务2完成 # 2秒
create_task():把任务扔到后台跑,马上返回一个“任务对象”,你可以随时 await 它。
2. 队列和生产者-消费者模式
异步还能玩“流水线”,比如一个任务生产数据,另一个消费数据:
import asyncio
import random
async def producer(queue):
for i in range(5):
await asyncio.sleep(1)
item = f"货物{i}"
print(f"生产了 {item}")
await queue.put(item) # 放进队列
async def consumer(queue):
while True:
item = await queue.get() # 从队列拿东西
print(f"消费了 {item}")
await asyncio.sleep(random.uniform(0.5, 1.5)) # 随机耗时
queue.task_done() # 标记任务完成
async def main():
queue = asyncio.Queue() # 创建队列
await asyncio.gather(producer(queue), consumer(queue))
asyncio.run(main())
输出(随机时间会有变化):
生产了 货物0
消费了 货物0
生产了 货物1
消费了 货物1
生产了 货物2
asyncio.Queue():异步队列,生产者放东西,消费者拿东西,适合多人协作的场景。
3. 异常处理:别崩了
异步任务多了,崩了咋办?加个异常处理:
import asyncio
async def risky_task():
await asyncio.sleep(1)
raise ValueError("我崩了!")
async def main():
try:
await asyncio.gather(risky_task(), return_exceptions=True) # 捕获异常
except ValueError as e:
print(f"抓到错误:{e}")
asyncio.run(main())
输出:
抓到错误:我崩了!
return_exceptions=True:让 gather 不直接抛异常,而是返回错误对象。
高级用法 1:事件循环(Event Loop)手动操控
asyncio.run() 其实是个“懒人封装”,背后真正的“大脑”是事件循环(Event Loop)。高级场景下,你可能需要自己掌控这个循环。
啥是事件循环? 事件循环就像个“任务调度员”,负责安排谁跑、谁暂停、啥时候继续。默认 asyncio.run() 会帮你启动和关闭它,但你也可以手动操作。
手动跑循环
import asyncio
async def say_hi():
print("Hi")
await asyncio.sleep(1)
print("Bye")
loop = asyncio.get_event_loop() # 拿到事件循环
loop.run_until_complete(say_hi()) # 跑一个任务
loop.close() # 关掉循环
geteventloop():拿到当前的事件循环。 rununtilcomplete():跑某个异步任务直到结束。
为啥要手动? 比如你想在一个程序里跑多个阶段的任务,或者在循环里塞点非异步的代码:
import asyncio
async def task(name):
print(f"{name} 开始")
await asyncio.sleep(1)
print(f"{name} 结束")
loop = asyncio.get_event_loop()
loop.run_until_complete(task("第一波")) # 第一阶段
print("中间干点别的")
loop.run_until_complete(task("第二波")) # 第二阶段
loop.close()
输出:
第一波 开始
第一波 结束
中间干点别的
第二波 开始
第二波 结束
注意:一个循环只能用一次,关了就不能再开。想多次跑,得用 asyncio.neweventloop() 创建新的。
高级点:自定义调度
可以用 callsoon 或 calllater 在循环里插队跑普通函数:
import asyncio
def normal_func():
print("我是一个普通函数")
async def main():
loop = asyncio.get_running_loop() # 获取当前循环
loop.call_later(1, normal_func) # 1秒后跑普通函数
await asyncio.sleep(2) # 等2秒,确保看到效果
asyncio.run(main())
输出:
(等1秒)
我是一个普通函数
(再等1秒程序结束)
高级用法 2:异步上下文管理器
有时候你需要初始化和清理资源(比如数据库连接),异步版本的 with 语句能帮你。
自定义异步上下文
import asyncio
class AsyncResource:
async def __aenter__(self):
print("资源初始化")
await asyncio.sleep(1) # 模拟异步初始化
return self
async def __aexit__(self, exc_type, exc, tb):
print("资源清理")
await asyncio.sleep(1) # 模拟异步清理
async def main():
async with AsyncResource() as r:
print("使用资源中")
await asyncio.sleep(1)
asyncio.run(main())
输出:
资源初始化
(等1秒)
使用资源中
(等1秒)
资源清理
(等1秒)
aenter:进入时异步初始化。 aexit:退出时异步清理。
实际用处 比如异步文件操作(用 aiofiles 库):
import aiofiles
import asyncio
async def write_file():
async with aiofiles.open("test.txt", "w") as f:
await f.write("异步写入文件")
print("写完了")
asyncio.run(write_file())
aiofiles 是异步文件操作库,搭配 asyncio 用,避免阻塞。
高级用法 3:信号处理与任务取消
异步程序跑着跑着,可能得中途停下来,或者响应外部信号(比如 Ctrl+C)。
取消任务
import asyncio
async def long_task():
try:
print("开始长任务")
await asyncio.sleep(10)
print("长任务结束")
except asyncio.CancelledError:
print("任务被取消了!")
raise # 抛出异常,确保清理
async def main():
task = asyncio.create_task(long_task())
await asyncio.sleep(2) # 等2秒
task.cancel() # 取消任务
try:
await task # 等任务被取消
except asyncio.CancelledError:
print("取消确认")
asyncio.run(main())
输出:
开始长任务
(等2秒)
任务被取消了!
取消确认
task.cancel():取消某个任务。 CancelledError:捕获取消时的异常。
响应信号 比如优雅地停止程序:
import asyncio
import signal
async def worker():
while True:
print("我在干活")
await asyncio.sleep(1)
def stop_loop(loop):
print("收到停止信号,关机啦")
loop.stop()
async def main():
loop = asyncio.get_running_loop()
loop.add_signal_handler(signal.SIGINT, stop_loop, loop) # 捕获 Ctrl+C
await worker()
asyncio.run(main())
跑起来后按 Ctrl+C:
我在干活
我在干活
^C收到停止信号,关机啦
高级用法 4:异步迭代器与生成器
异步还能玩“边等边拿”的迭代器,适合处理流式数据。
异步迭代器
import asyncio
class AsyncCounter:
def __init__(self, stop):
self.current = 0
self.stop = stop
def __aiter__(self):
return self
async def __anext__(self):
if self.current >= self.stop:
raise StopAsyncIteration
await asyncio.sleep(1) # 模拟异步等待
self.current += 1
return self.current - 1
async def main():
async for num in AsyncCounter(3):
print(f"拿到数字:{num}")
asyncio.run(main())
输出:
(等1秒)
拿到数字:0
(等1秒)
拿到数字:1
(等1秒)
拿到数字:2
async for:异步迭代,边等边处理。
异步生成器 更简单点的写法:
import asyncio
async def async_gen():
for i in range(3):
await asyncio.sleep(1)
yield i # 异步生成值
async def main():
async for num in async_gen():
print(f"生成:{num}")
asyncio.run(main())
输出跟上面一样。异步生成器用 yield 产出值,适合流式处理大数据。
高级用法 5:线程与异步混用
有时候你得跑点阻塞的同步代码(比如第三方库不支持异步),可以用 runinexecutor 扔到线程里。
import asyncio
import time
def blocking_task():
time.sleep(2) # 模拟阻塞
return "阻塞任务完成"
async def main():
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(None, blocking_task) # 扔到默认线程池
print(result)
asyncio.run(main())
输出:
(等2秒)
阻塞任务完成
runinexecutor(None, 函数):把同步函数扔到线程池跑,不卡主循环。
文档信息
版权声明:可自由转载(请注明转载出处)-非商用-非衍生
发表时间:2025年7月1日 09:53