Python 的高级玩法异步和线程
asyncio
Python 的 asyncio,从啥都不懂的初级到能玩转异步编程的高级用法。asyncio 是 Python 用来处理异步编程的库,简单说就是让程序在“等东西”的时候不傻乎乎干等着,而是去干点别的活儿,提高效率。咱们慢慢来,别急,一步步走。
啥是异步?怎么跑起来?
1. 理解异步的核心:别干等
想象你在煮饭:水开了要等 10 分钟才能下面条。同步编程就像你傻站那儿盯着锅等 10 分钟啥也不干;异步编程呢,就是水烧着的时候你去刷个碗、切个菜,等水开了再回来下面条。asyncio 就是帮你实现这种“不干等”的工具。
2. 最简单的起步:跑个 Hello World
Python 自带 asyncio,不用装啥,直接开干。咱先写个最简单的异步代码:
import asyncio
async def say_hello():
print("Hello")
await asyncio.sleep(1) # 假装等1秒,就像烧水
print("World")
asyncio.run(say_hello())
- async def:定义一个异步函数,告诉 Python 这函数可以“暂停”。
- await:遇到 await 就先暂停,等它后面的东西(比如 sleep)干完再继续。
- asyncio.run():启动异步程序的入口。
跑一下,输出是:
Hello
(等1秒)
World
这就是异步的基本味儿:程序不会卡死,能“暂停”再“继续”。
多干点活儿:跑多个任务
光一个任务没啥意思,异步的牛逼之处是能同时干多个事。比如:
import asyncio
async def task1():
print("任务1开始")
await asyncio.sleep(2)
print("任务1结束")
async def task2():
print("任务2开始")
await asyncio.sleep(1)
print("任务2结束")
async def main():
await asyncio.gather(task1(), task2()) # 一起跑
asyncio.run(main())
输出:
任务1开始
任务2开始
任务2结束 # 1秒后
任务1结束 # 2秒后
- asyncio.gather():把多个任务塞一块儿跑,类似于“烧水的时候切菜”。
- 任务1等2秒,任务2等1秒,但它们是一起开始的,总时间只花了2秒,而不是3秒(2+1)。
整点实用的活儿
加点交互:模拟网络请求
现实中,异步最常用在等网络请求(比如爬网页、调接口)。咱用 sleep 模拟一下延迟:
import asyncio
async def fetch_data(name, delay):
print(f"{name} 开始请求数据")
await asyncio.sleep(delay) # 模拟网络延迟
print(f"{name} 数据拿到啦")
return f"{name} 的结果"
async def main():
results = await asyncio.gather(
fetch_data("网页A", 2),
fetch_data("网页B", 1),
fetch_data("网页C", 3)
)
print("所有结果:", results)
asyncio.run(main())
输出:
网页A 开始请求数据
网页B 开始请求数据
网页C 开始请求数据
网页B 数据拿到啦 # 1秒
网页A 数据拿到啦 # 2秒
网页C 数据拿到啦 # 3秒
所有结果: ['网页A 的结果', '网页B 的结果', '网页C 的结果']
- 这里模拟了三个“请求”,每个耗时不同,但总耗时是3秒(最长的那个),而不是6秒(2+1+3)。
加个超时:别等太久
有时候网络太慢,咱得加个“等不及就放弃”的逻辑:
import asyncio
async def slow_task():
await asyncio.sleep(5)
return "终于好了"
async def main():
try:
result = await asyncio.wait_for(slow_task(), timeout=2) # 只等2秒
print(result)
except asyncio.TimeoutError:
print("等不及了,超时了!")
asyncio.run(main())
输出:
等不及了,超时了!
- asyncio.wait_for():设置超时,超过时间就抛异常。
整复杂点的活儿
任务调度:手动控制谁先跑
有时候你不想全扔一块儿跑,而是手动控制任务。可以用 asyncio.create_task()
:
import asyncio
async def task1():
await asyncio.sleep(1)
print("任务1完成")
async def task2():
await asyncio.sleep(2)
print("任务2完成")
async def main():
t1 = asyncio.create_task(task1()) # 创建任务,马上跑
t2 = asyncio.create_task(task2())
print("任务都安排上了")
await t1 # 等任务1完成
print("任务1好了,继续干别的")
await t2 # 再等任务2
asyncio.run(main())
输出:
任务都安排上了
任务1完成 # 1秒
任务1好了,继续干别的
任务2完成 # 2秒
- create_task():把任务扔到后台跑,马上返回一个“任务对象”,你可以随时 await 它。
队列和生产者-消费者模式
异步还能玩“流水线”,比如一个任务生产数据,另一个消费数据:
import asyncio
import random
async def producer(queue):
for i in range(5):
await asyncio.sleep(1)
item = f"货物{i}"
print(f"生产了 {item}")
await queue.put(item) # 放进队列
async def consumer(queue):
while True:
item = await queue.get() # 从队列拿东西
print(f"消费了 {item}")
await asyncio.sleep(random.uniform(0.5, 1.5)) # 随机耗时
queue.task_done() # 标记任务完成
async def main():
queue = asyncio.Queue() # 创建队列
await asyncio.gather(producer(queue), consumer(queue))
asyncio.run(main())
输出(随机时间会有变化):
生产了 货物0
消费了 货物0
生产了 货物1
消费了 货物1
生产了 货物2
...
- asyncio.Queue():异步队列,生产者放东西,消费者拿东西,适合多人协作的场景。
异常处理:别崩了
异步任务多了,崩了咋办?加个异常处理:
import asyncio
async def risky_task():
await asyncio.sleep(1)
raise ValueError("我崩了!")
async def main():
try:
await asyncio.gather(risky_task(), return_exceptions=True) # 捕获异常
except ValueError as e:
print(f"抓到错误:{e}")
asyncio.run(main())
输出:
抓到错误:我崩了!
- return_exceptions=True:让 gather 不直接抛异常,而是返回错误对象。
总结:从初级到高级
- 初级:学会 async/await,用 asyncio.run() 跑简单任务,理解“暂停”和“并发”。
- 中级:用 gather 跑多个任务,模拟网络请求,加超时控制。
- 高级:用 create_task 调度任务,玩队列模式,处理异常。
异步编程的核心就是“别干等”,多任务并行干活儿。实际用的时候,比如爬虫、服务器、网络请求,asyncio 配合 aiohttp(异步HTTP库)会更香。
高级用法 1:事件循环(Event Loop)手动操控
asyncio.run() 其实是个“懒人封装”,背后真正的“大脑”是事件循环(Event Loop)。高级场景下,你可能需要自己掌控这个循环。
啥是事件循环? * 事件循环就像个“任务调度员”,负责安排谁跑、谁暂停、啥时候继续。默认 asyncio.run() 会帮你启动和关闭它,但你也可以手动操作。
手动跑循环
import asyncio
async def say_hi():
print("Hi")
await asyncio.sleep(1)
print("Bye")
loop = asyncio.get_event_loop() # 拿到事件循环
loop.run_until_complete(say_hi()) # 跑一个任务
loop.close() # 关掉循环
- geteventloop():拿到当前的事件循环。
- rununtilcomplete():跑某个异步任务直到结束。
为啥要手动?
比如你想在一个程序里跑多个阶段的任务,或者在循环里塞点非异步的代码:
import asyncio
async def task(name):
print(f"{name} 开始")
await asyncio.sleep(1)
print(f"{name} 结束")
loop = asyncio.get_event_loop()
loop.run_until_complete(task("第一波")) # 第一阶段
print("中间干点别的")
loop.run_until_complete(task("第二波")) # 第二阶段
loop.close()
输出:
第一波 开始
第一波 结束
中间干点别的
第二波 开始
第二波 结束
注意:一个循环只能用一次,关了就不能再开。想多次跑,得用 asyncio.neweventloop() 创建新的。
高级点:自定义调度
可以用 callsoon 或 calllater 在循环里插队跑普通函数:
import asyncio
def normal_func():
print("我是一个普通函数")
async def main():
loop = asyncio.get_running_loop() # 获取当前循环
loop.call_later(1, normal_func) # 1秒后跑普通函数
await asyncio.sleep(2) # 等2秒,确保看到效果
asyncio.run(main())
输出:
(等1秒)
我是一个普通函数
(再等1秒程序结束)
- call_later(秒数, 函数):延迟调用普通函数,适合混合同步代码。
高级用法 2:异步上下文管理器
有时候你需要初始化和清理资源(比如数据库连接),异步版本的 with 语句能帮你。
自定义异步上下文
import asyncio
class AsyncResource:
async def __aenter__(self):
print("资源初始化")
await asyncio.sleep(1) # 模拟异步初始化
return self
async def __aexit__(self, exc_type, exc, tb):
print("资源清理")
await asyncio.sleep(1) # 模拟异步清理
async def main():
async with AsyncResource() as r:
print("使用资源中")
await asyncio.sleep(1)
asyncio.run(main())
输出:
资源初始化
(等1秒)
使用资源中
(等1秒)
资源清理
(等1秒)
*aenter:进入时异步初始化。 *aexit:退出时异步清理。
实际用处
比如异步文件操作(用 aiofiles 库):
import aiofiles
import asyncio
async def write_file():
async with aiofiles.open("test.txt", "w") as f:
await f.write("异步写入文件")
print("写完了")
asyncio.run(write_file())
- aiofiles 是异步文件操作库,搭配 asyncio 用,避免阻塞。
高级用法 3:信号处理与任务取消
异步程序跑着跑着,可能得中途停下来,或者响应外部信号(比如 Ctrl+C)。
取消任务
import asyncio
async def long_task():
try:
print("开始长任务")
await asyncio.sleep(10)
print("长任务结束")
except asyncio.CancelledError:
print("任务被取消了!")
raise # 抛出异常,确保清理
async def main():
task = asyncio.create_task(long_task())
await asyncio.sleep(2) # 等2秒
task.cancel() # 取消任务
try:
await task # 等任务被取消
except asyncio.CancelledError:
print("取消确认")
asyncio.run(main())
输出:
开始长任务
(等2秒)
任务被取消了!
取消确认
- task.cancel():取消某个任务。
- CancelledError:捕获取消时的异常。
响应信号
比如优雅地停止程序:
import asyncio
import signal
async def worker():
while True:
print("我在干活")
await asyncio.sleep(1)
def stop_loop(loop):
print("收到停止信号,关机啦")
loop.stop()
async def main():
loop = asyncio.get_running_loop()
loop.add_signal_handler(signal.SIGINT, stop_loop, loop) # 捕获 Ctrl+C
await worker()
asyncio.run(main())
跑起来后按 Ctrl+C:
我在干活
我在干活
^C收到停止信号,关机啦
- addsignalhandler:绑定信号(比如 SIGINT)到回调函数。
高级用法 4:异步迭代器与生成器
异步还能玩“边等边拿”的迭代器,适合处理流式数据。
异步迭代器
import asyncio
class AsyncCounter:
def __init__(self, stop):
self.current = 0
self.stop = stop
def __aiter__(self):
return self
async def __anext__(self):
if self.current >= self.stop:
raise StopAsyncIteration
await asyncio.sleep(1) # 模拟异步等待
self.current += 1
return self.current - 1
async def main():
async for num in AsyncCounter(3):
print(f"拿到数字:{num}")
asyncio.run(main())
输出:
(等1秒)
拿到数字:0
(等1秒)
拿到数字:1
(等1秒)
拿到数字:2
- async for:异步迭代,边等边处理。
异步生成器
更简单点的写法:
import asyncio
async def async_gen():
for i in range(3):
await asyncio.sleep(1)
yield i # 异步生成值
async def main():
async for num in async_gen():
print(f"生成:{num}")
asyncio.run(main())
输出跟上面一样。异步生成器用 yield 产出值,适合流式处理大数据。
高级用法 5:线程与异步混用
有时候你得跑点阻塞的同步代码(比如第三方库不支持异步),可以用 runinexecutor 扔到线程里。
import asyncio
import time
def blocking_task():
time.sleep(2) # 模拟阻塞
return "阻塞任务完成"
async def main():
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(None, blocking_task) # 扔到默认线程池
print(result)
asyncio.run(main())
输出:
(等2秒)
阻塞任务完成
- runinexecutor(None, 函数):把同步函数扔到线程池跑,不卡主循环。
总结:高级玩法的精髓
- 事件循环:手动控制任务调度,混合同步代码。
- 上下文管理器:优雅处理异步资源的初始化和清理。
- 任务取消与信号:让程序能随时停下来,响应外部变化。
- 异步迭代器/生成器:处理流式数据,边等边用。
- 线程混用:兼容阻塞代码,不让它拖后腿。
这些用法适合复杂场景,比如写服务器(用 asyncio 搭个 TCP/UDP 服务)、爬虫(配合 aiohttp)、实时系统(处理 WebSocket)。
threading
Python 的 threading 模块,从啥都不懂的初级到能玩转多线程的高级用法。threading 是 Python 用来处理多线程的工具,简单说就是让程序同时干多件事儿,比如一边下载文件一边播放音乐。
初级:啥是线程?怎么跑起来?
1. 理解线程:多干点活儿
想象你在饭店点餐:普通程序(单线程)就像服务员只盯着你,等你吃完再去招呼别人;多线程呢,就像饭店有好几个服务员,同时招呼好几桌客人。threading 就是帮你“雇”几个服务员,让程序并行干活。
2. 最简单的起步:跑个线程
Python 自带 threading 模块,不用装啥,直接开干。咱先写个最简单的多线程代码:
import threading
import time
def say_hello():
print("Hello")
time.sleep(1) # 假装干活1秒
print("World")
# 创建线程
t = threading.Thread(target=say_hello)
# 启动线程
t.start()
# 主线程继续干活
print("主线程不等你,我先跑了")
跑一下,输出可能是:
Hello
主线程不等你,我先跑了
(等1秒)
World
- threading.Thread(target=函数):创建一个线程,告诉它要跑哪个函数。
- start():启动线程,让它开始干活。
- 主线程(程序默认的线程)和新线程一起跑,谁先谁后看运气。
3. 等线程干完:join
有时候你想让主线程等等其他线程干完再走,用 join():
import threading
import time
def task():
print("任务开始")
time.sleep(2)
print("任务结束")
t = threading.Thread(target=task)
t.start()
t.join() # 主线程在这等着
print("线程干完了,我再跑")
输出:
任务开始
(等2秒)
任务结束
线程干完了,我再跑
- join():让主线程等着,直到这个线程干完。
4. 多线程一起跑
再加几个线程试试:
import threading
import time
def worker(name):
print(f"工人 {name} 开始干活")
time.sleep(1)
print(f"工人 {name} 干完了")
threads = []
for i in range(3):
t = threading.Thread(target=worker, args=(i,))
threads.append(t)
t.start()
for t in threads:
t.join() # 等所有线程干完
print("所有工人下班了")
输出可能是:
工人 0 开始干活
工人 1 开始干活
工人 2 开始干活
(等1秒)
工人 0 干完了
工人 1 干完了
工人 2 干完了
所有工人下班了
- args=(i,):给线程函数传参数。
- 多个线程几乎同时启动,总耗时是1秒,而不是3秒(1+1+1)。
中级:整点实用的活儿
1. 线程安全的共享数据:锁
多线程一起跑有个坑:如果大家同时改同一块数据,可能乱套。比如计数器:
import threading
counter = 0
def add():
global counter
for _ in range(100000):
counter += 1
t1 = threading.Thread(target=add)
t2 = threading.Thread(target=add)
t1.start()
t2.start()
t1.join()
t2.join()
print("计数器结果:", counter)
你可能期待结果是 200000(两个线程各加 100000),但实际可能是 180000 之类的。为什么?因为线程抢着改 counter,会“撞车”。
解决办法:加锁
import threading
counter = 0
lock = threading.Lock() # 创建锁
def add():
global counter
for _ in range(100000):
with lock: # 上锁
counter += 1
t1 = threading.Thread(target=add)
t2 = threading.Thread(target=add)
t1.start()
t2.start()
t1.join()
t2.join()
print("计数器结果:", counter)
输出:
计数器结果:200000
- threading.Lock():创建一个锁。
- with lock
threading 模块里的 Event
Event 是多线程编程里的一个“信号灯”,用来让线程之间互相“喊话”,协调谁干活、谁等着。
啥是 Event?
想象你在组织一个饭局,大家都到了才能开吃。Event 就像饭店老板娘,等所有人到齐,她喊一声“开饭啦”,大家才动筷子。在线程里,Event 是一个信号工具:
- 默认是“红灯”(未设置状态,False),线程看到红灯就等着。
- 有人把信号设成“绿灯”(True),等着的人就知道可以干活了。
基础用法:红灯绿灯
- 创建和简单使用
import threading
import time
# 创建一个 Event 对象
event = threading.Event()
def waiter():
print("小弟等着老板喊开工")
event.wait() # 等绿灯
print("老板喊了,小弟开干!")
def boss():
print("老板先忙别的")
time.sleep(2) # 假装忙2秒
print("老板喊:开工啦!")
event.set() # 红灯变绿灯
# 创建线程
t1 = threading.Thread(target=waiter)
t2 = threading.Thread(target=boss)
t1.start()
t2.start()
t1.join()
t2.join()
输出:
小弟等着老板喊开工
老板先忙别的
(等2秒)
老板喊:开工啦!
小弟开干!
- threading.Event():创建一个信号对象,默认是红灯(False)。
- event.wait():线程在这等着,直到信号变绿(True)。
- event.set():把信号设成绿灯,通知所有等着的人。
- 检查信号状态
可以用 is_set() 看看当前是红灯还是绿灯:
import threading
event = threading.Event()
print("信号状态:", event.is_set()) # False,红灯
event.set()
print("信号状态:", event.is_set()) # True,绿灯
输出:
信号状态:False
信号状态:True
- 关掉绿灯:clear
信号变绿后,可以用 clear() 把它变回红灯:
import threading
import time
event = threading.Event()
def worker():
print("工人等着信号")
event.wait()
print("信号绿了,干活啦")
time.sleep(1)
print("活干完了")
t = threading.Thread(target=worker)
t.start()
time.sleep(1)
print("老板点绿灯")
event.set() # 绿灯
time.sleep(2)
print("老板关绿灯")
event.clear() # 变回红灯
print("信号状态:", event.is_set())
t.join()
输出:
工人等着信号
(等1秒)
老板点绿灯
信号绿了,干活啦
(等1秒)
活干完了
老板关绿灯
信号状态:False
- clear():把绿灯变回红灯,适合重复使用的场景。
实用场景:协调多线程
- 等所有人准备好再开工
比如三个工人得等老板一声令下一起干活:
import threading
import time
event = threading.Event()
def worker(name):
print(f"工人 {name} 准备好了")
event.wait() # 等信号
print(f"工人 {name} 开始干活")
# 创建3个工人线程
threads = []
for i in range(3):
t = threading.Thread(target=worker, args=(i,))
threads.append(t)
t.start()
time.sleep(1) # 假装老板磨蹭一下
print("老板喊:全体开工!")
event.set() # 绿灯,所有人一起干
for t in threads:
t.join()
输出:
工人 0 准备好了
工人 1 准备好了
工人 2 准备好了
(等1秒)
老板喊:全体开工!
工人 0 开始干活
工人 1 开始干活
工人 2 开始干活
- 一个 Event 可以控制多个线程,信号一变,所有等着的人都动。
- 加个超时:别傻等 有时候不想无限等,可以给 wait() 加个超时:
import threading
import time
event = threading.Event()
def lazy_worker():
print("懒工人等着信号")
if event.wait(timeout=2): # 最多等2秒
print("信号来了,干活")
else:
print("等烦了,不干了")
t = threading.Thread(target=lazy_worker)
t.start()
time.sleep(3) # 老板故意拖延
print("老板姗姗来迟")
event.set()
t.join()
输出:
懒工人等着信号
(等2秒)
等烦了,不干了
(再等1秒)
老板姗姗来迟
- wait(timeout=秒数):等够时间没绿灯就返回 False,继续往下走。
高级用法:生产者-消费者模式
Event 在生产者和消费者场景里很常见,比如一个线程生产数据,另一个消费数据:
import threading
import time
import random
event = threading.Event()
data = None
def producer():
global data
for i in range(3):
time.sleep(1)
data = f"货物 {i}"
print(f"生产者生产了 {data}")
event.set() # 通知消费者
event.wait() # 等消费者拿走
event.clear() # 重置信号
def consumer():
for _ in range(3):
event.wait() # 等生产者信号
print(f"消费者拿到了 {data}")
time.sleep(random.uniform(0.5, 1)) # 模拟消费时间
event.set() # 通知生产者我拿完了
event.clear() # 重置信号
t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer)
t1.start()
t2.start()
t1.join()
t2.join()
输出(时间随机):
生产者生产了 货物 0
消费者拿到了 货物 0
生产者生产了 货物 1
消费者拿到了 货物 1
生产者生产了 货物 2
消费者拿到了 货物 2
- 生产者用 set() 通知有货,消费者用 set() 通知拿完了。
- 用 clear() 重置信号,确保每次交易干净利落。
Event 的核心
- 基础:wait() 等信号,set() 发信号,clear() 撤信号,像红绿灯。
- 实用:控制多个线程同步开工,或者加超时避免死等。
- 高级:在生产者-消费者模式里当“传话筒”,协调线程间节奏。
Event 的牛逼之处在于简单又灵活,适合线程间需要“喊一嗓子”的场景。比起锁(Lock)复杂点,但比队列(Queue)轻量。
啥是 runcoroutinethreadsafe?
想象你在一个多线程程序里跑着,主线程用 threading 干活,但你还想偷偷塞点异步任务(asyncio 的协程)进去。问题来了:asyncio 的东西得在事件循环里跑,而线程和事件循环不是一个世界,怎么办?runcoroutinethreadsafe 就是个“跨界送信员”,帮你从线程里安全地把协程扔到 asyncio 的事件循环里去执行。
简单说:它是从线程里调用异步代码的桥梁。
怎么用? 基本用法 你得先有个跑着的事件循环,然后从线程里用这个函数提交任务。
import asyncio
import threading
import time
# 异步任务
async def say_hello():
print("Hello from asyncio")
await asyncio.sleep(1)
print("Goodbye from asyncio")
# 线程里跑的函数
def thread_task(loop):
print("线程开始干活")
# 从线程提交协程到事件循环
future = asyncio.run_coroutine_threadsafe(say_hello(), loop)
# 等结果(可选)
result = future.result() # 阻塞等协程跑完
print("线程拿到结果:", result)
# 主函数:启动事件循环
def main():
loop = asyncio.get_event_loop()
# 在线程里跑
t = threading.Thread(target=thread_task, args=(loop,))
t.start()
# 主线程跑事件循环
loop.run_forever()
t.join()
if __name__ == "__main__":
main()
输出:
线程开始干活
Hello from asyncio
(等1秒)
Goodbye from asyncio
线程拿到结果: None
关键点 1. asyncio.runcoroutinethreadsafe(协程, 事件循环): * 第一个参数是你的异步函数(协程)。 * 第二个参数是目标事件循环(得是已经跑起来的)。 * 返回一个 concurrent.futures.Future 对象,可以用 .result() 拿结果。 2. 事件循环得活着: * loop.run_forever() 让事件循环一直跑,不然协程没机会执行。 * 如果用 asyncio.run(),它会自动关循环,不适合这种场景。 3. 线程安全: * 这个函数专门设计来避免线程和 asyncio 的冲突,直接扔任务不用操心。
啥时候用? 你有个多线程程序,但某些任务想用 asyncio 的异步库(比如 aiohttp 发请求)。 主程序跑线程,但想“借用”事件循环跑点异步活儿。 比如:线程下载文件,异步上传结果。
简单例子:线程+异步混搭
import asyncio
import threading
import time
async def async_task():
await asyncio.sleep(1)
return "异步任务完成"
def thread_worker(loop):
print("线程喊:干点异步活儿")
future = asyncio.run_coroutine_threadsafe(async_task(), loop)
result = future.result()
print("线程收到:", result)
loop = asyncio.new_event_loop()
t = threading.Thread(target=thread_worker, args=(loop,))
t.start()
# 主线程跑循环
asyncio.set_event_loop(loop)
loop.run_forever()
输出:
线程喊:干点异步活儿
(等1秒)
线程收到: 异步任务完成
总结
- 作用:从线程里安全跑 asyncio 协程。
- 用法:runcoroutinethreadsafe(协程, loop),扔任务给事件循环。
- 场景:线程和异步混用时。
简单吧?就是个“跨界工具”,让线程和 asyncio 和平共处。
文档信息
版权声明:可自由转载(请注明转载出处)-非商用-非衍生
发表时间:2025年3月2日 16:16