Python 的高级玩法异步和线程

asyncio

Python 的 asyncio,从啥都不懂的初级到能玩转异步编程的高级用法。asyncio 是 Python 用来处理异步编程的库,简单说就是让程序在“等东西”的时候不傻乎乎干等着,而是去干点别的活儿,提高效率。咱们慢慢来,别急,一步步走。

啥是异步?怎么跑起来?

1. 理解异步的核心:别干等

想象你在煮饭:水开了要等 10 分钟才能下面条。同步编程就像你傻站那儿盯着锅等 10 分钟啥也不干;异步编程呢,就是水烧着的时候你去刷个碗、切个菜,等水开了再回来下面条。asyncio 就是帮你实现这种“不干等”的工具。

2. 最简单的起步:跑个 Hello World

Python 自带 asyncio,不用装啥,直接开干。咱先写个最简单的异步代码:

import asyncio

async def say_hello():
    print("Hello")
    await asyncio.sleep(1)  # 假装等1秒,就像烧水
    print("World")

asyncio.run(say_hello())
  • async def:定义一个异步函数,告诉 Python 这函数可以“暂停”。
  • await:遇到 await 就先暂停,等它后面的东西(比如 sleep)干完再继续。
  • asyncio.run():启动异步程序的入口。

跑一下,输出是:

Hello
(等1秒)
World

这就是异步的基本味儿:程序不会卡死,能“暂停”再“继续”。

多干点活儿:跑多个任务

光一个任务没啥意思,异步的牛逼之处是能同时干多个事。比如:

import asyncio

async def task1():
    print("任务1开始")
    await asyncio.sleep(2)
    print("任务1结束")

async def task2():
    print("任务2开始")
    await asyncio.sleep(1)
    print("任务2结束")

async def main():
    await asyncio.gather(task1(), task2())  # 一起跑

asyncio.run(main())

输出:

任务1开始
任务2开始
任务2结束  # 1秒后
任务1结束  # 2秒后
  • asyncio.gather():把多个任务塞一块儿跑,类似于“烧水的时候切菜”。
  • 任务1等2秒,任务2等1秒,但它们是一起开始的,总时间只花了2秒,而不是3秒(2+1)。

整点实用的活儿

加点交互:模拟网络请求

现实中,异步最常用在等网络请求(比如爬网页、调接口)。咱用 sleep 模拟一下延迟:

import asyncio

async def fetch_data(name, delay):
    print(f"{name} 开始请求数据")
    await asyncio.sleep(delay)  # 模拟网络延迟
    print(f"{name} 数据拿到啦")
    return f"{name} 的结果"

async def main():
    results = await asyncio.gather(
        fetch_data("网页A", 2),
        fetch_data("网页B", 1),
        fetch_data("网页C", 3)
    )
    print("所有结果:", results)

asyncio.run(main())

输出:

网页A 开始请求数据
网页B 开始请求数据
网页C 开始请求数据
网页B 数据拿到啦  # 1秒
网页A 数据拿到啦  # 2秒
网页C 数据拿到啦  # 3秒
所有结果: ['网页A 的结果', '网页B 的结果', '网页C 的结果']
  • 这里模拟了三个“请求”,每个耗时不同,但总耗时是3秒(最长的那个),而不是6秒(2+1+3)。

加个超时:别等太久

有时候网络太慢,咱得加个“等不及就放弃”的逻辑:

import asyncio

async def slow_task():
    await asyncio.sleep(5)
    return "终于好了"

async def main():
    try:
        result = await asyncio.wait_for(slow_task(), timeout=2)  # 只等2秒
        print(result)
    except asyncio.TimeoutError:
        print("等不及了,超时了!")

asyncio.run(main())

输出:

等不及了,超时了!
  • asyncio.wait_for():设置超时,超过时间就抛异常。

整复杂点的活儿

任务调度:手动控制谁先跑

有时候你不想全扔一块儿跑,而是手动控制任务。可以用 asyncio.create_task()

import asyncio

async def task1():
    await asyncio.sleep(1)
    print("任务1完成")

async def task2():
    await asyncio.sleep(2)
    print("任务2完成")

async def main():
    t1 = asyncio.create_task(task1())  # 创建任务,马上跑
    t2 = asyncio.create_task(task2())
    print("任务都安排上了")
    await t1  # 等任务1完成
    print("任务1好了,继续干别的")
    await t2  # 再等任务2

asyncio.run(main())

输出:

任务都安排上了
任务1完成  # 1秒
任务1好了,继续干别的
任务2完成  # 2秒
  • create_task():把任务扔到后台跑,马上返回一个“任务对象”,你可以随时 await 它。

队列和生产者-消费者模式

异步还能玩“流水线”,比如一个任务生产数据,另一个消费数据:

import asyncio
import random

async def producer(queue):
    for i in range(5):
        await asyncio.sleep(1)
        item = f"货物{i}"
        print(f"生产了 {item}")
        await queue.put(item)  # 放进队列

async def consumer(queue):
    while True:
        item = await queue.get()  # 从队列拿东西
        print(f"消费了 {item}")
        await asyncio.sleep(random.uniform(0.5, 1.5))  # 随机耗时
        queue.task_done()  # 标记任务完成

async def main():
    queue = asyncio.Queue()  # 创建队列
    await asyncio.gather(producer(queue), consumer(queue))

asyncio.run(main())

输出(随机时间会有变化):

生产了 货物0
消费了 货物0
生产了 货物1
消费了 货物1
生产了 货物2
...
  • asyncio.Queue():异步队列,生产者放东西,消费者拿东西,适合多人协作的场景。

异常处理:别崩了

异步任务多了,崩了咋办?加个异常处理:

import asyncio

async def risky_task():
    await asyncio.sleep(1)
    raise ValueError("我崩了!")

async def main():
    try:
        await asyncio.gather(risky_task(), return_exceptions=True)  # 捕获异常
    except ValueError as e:
        print(f"抓到错误:{e}")

asyncio.run(main())

输出:

抓到错误:我崩了!
  • return_exceptions=True:让 gather 不直接抛异常,而是返回错误对象。

总结:从初级到高级

  • 初级:学会 async/await,用 asyncio.run() 跑简单任务,理解“暂停”和“并发”。
  • 中级:用 gather 跑多个任务,模拟网络请求,加超时控制。
  • 高级:用 create_task 调度任务,玩队列模式,处理异常。

异步编程的核心就是“别干等”,多任务并行干活儿。实际用的时候,比如爬虫、服务器、网络请求,asyncio 配合 aiohttp(异步HTTP库)会更香。

高级用法 1:事件循环(Event Loop)手动操控

asyncio.run() 其实是个“懒人封装”,背后真正的“大脑”是事件循环(Event Loop)。高级场景下,你可能需要自己掌控这个循环。

啥是事件循环? * 事件循环就像个“任务调度员”,负责安排谁跑、谁暂停、啥时候继续。默认 asyncio.run() 会帮你启动和关闭它,但你也可以手动操作。

手动跑循环

import asyncio

async def say_hi():
    print("Hi")
    await asyncio.sleep(1)
    print("Bye")

loop = asyncio.get_event_loop()  # 拿到事件循环
loop.run_until_complete(say_hi())  # 跑一个任务
loop.close()  # 关掉循环
  • geteventloop():拿到当前的事件循环。
  • rununtilcomplete():跑某个异步任务直到结束。

为啥要手动?

比如你想在一个程序里跑多个阶段的任务,或者在循环里塞点非异步的代码:

import asyncio

async def task(name):
    print(f"{name} 开始")
    await asyncio.sleep(1)
    print(f"{name} 结束")

loop = asyncio.get_event_loop()
loop.run_until_complete(task("第一波"))  # 第一阶段
print("中间干点别的")
loop.run_until_complete(task("第二波"))  # 第二阶段
loop.close()

输出:

第一波 开始
第一波 结束
中间干点别的
第二波 开始
第二波 结束

注意:一个循环只能用一次,关了就不能再开。想多次跑,得用 asyncio.neweventloop() 创建新的。

高级点:自定义调度

可以用 callsoon 或 calllater 在循环里插队跑普通函数:

import asyncio

def normal_func():
    print("我是一个普通函数")

async def main():
    loop = asyncio.get_running_loop()  # 获取当前循环
    loop.call_later(1, normal_func)  # 1秒后跑普通函数
    await asyncio.sleep(2)  # 等2秒,确保看到效果

asyncio.run(main())

输出:

(等1秒)
我是一个普通函数
(再等1秒程序结束)

  • call_later(秒数, 函数):延迟调用普通函数,适合混合同步代码。

高级用法 2:异步上下文管理器

有时候你需要初始化和清理资源(比如数据库连接),异步版本的 with 语句能帮你。

自定义异步上下文

import asyncio

class AsyncResource:
    async def __aenter__(self):
        print("资源初始化")
        await asyncio.sleep(1)  # 模拟异步初始化
        return self

    async def __aexit__(self, exc_type, exc, tb):
        print("资源清理")
        await asyncio.sleep(1)  # 模拟异步清理

async def main():
    async with AsyncResource() as r:
        print("使用资源中")
        await asyncio.sleep(1)

asyncio.run(main())

输出:

资源初始化
(等1秒)
使用资源中
(等1秒)
资源清理
(等1秒)

*aenter:进入时异步初始化。 *aexit:退出时异步清理。

实际用处

比如异步文件操作(用 aiofiles 库):

import aiofiles
import asyncio

async def write_file():
    async with aiofiles.open("test.txt", "w") as f:
        await f.write("异步写入文件")
    print("写完了")

asyncio.run(write_file())
  • aiofiles 是异步文件操作库,搭配 asyncio 用,避免阻塞。

高级用法 3:信号处理与任务取消

异步程序跑着跑着,可能得中途停下来,或者响应外部信号(比如 Ctrl+C)。

取消任务

import asyncio

async def long_task():
    try:
        print("开始长任务")
        await asyncio.sleep(10)
        print("长任务结束")
    except asyncio.CancelledError:
        print("任务被取消了!")
        raise  # 抛出异常,确保清理

async def main():
    task = asyncio.create_task(long_task())
    await asyncio.sleep(2)  # 等2秒
    task.cancel()  # 取消任务
    try:
        await task  # 等任务被取消
    except asyncio.CancelledError:
        print("取消确认")

asyncio.run(main())

输出:

开始长任务
(等2秒)
任务被取消了!
取消确认
  • task.cancel():取消某个任务。
  • CancelledError:捕获取消时的异常。

响应信号

比如优雅地停止程序:

import asyncio
import signal

async def worker():
    while True:
        print("我在干活")
        await asyncio.sleep(1)

def stop_loop(loop):
    print("收到停止信号,关机啦")
    loop.stop()

async def main():
    loop = asyncio.get_running_loop()
    loop.add_signal_handler(signal.SIGINT, stop_loop, loop)  # 捕获 Ctrl+C
    await worker()

asyncio.run(main())

跑起来后按 Ctrl+C:

我在干活
我在干活
^C收到停止信号,关机啦
  • addsignalhandler:绑定信号(比如 SIGINT)到回调函数。

高级用法 4:异步迭代器与生成器

异步还能玩“边等边拿”的迭代器,适合处理流式数据。

异步迭代器

import asyncio

class AsyncCounter:
    def __init__(self, stop):
        self.current = 0
        self.stop = stop

    def __aiter__(self):
        return self

    async def __anext__(self):
        if self.current >= self.stop:
            raise StopAsyncIteration
        await asyncio.sleep(1)  # 模拟异步等待
        self.current += 1
        return self.current - 1

async def main():
    async for num in AsyncCounter(3):
        print(f"拿到数字:{num}")

asyncio.run(main())

输出:

(等1秒)
拿到数字:0
(等1秒)
拿到数字:1
(等1秒)
拿到数字:2
  • async for:异步迭代,边等边处理。

异步生成器

更简单点的写法:

import asyncio

async def async_gen():
    for i in range(3):
        await asyncio.sleep(1)
        yield i  # 异步生成值

async def main():
    async for num in async_gen():
        print(f"生成:{num}")

asyncio.run(main())

输出跟上面一样。异步生成器用 yield 产出值,适合流式处理大数据。

高级用法 5:线程与异步混用

有时候你得跑点阻塞的同步代码(比如第三方库不支持异步),可以用 runinexecutor 扔到线程里。

import asyncio
import time

def blocking_task():
    time.sleep(2)  # 模拟阻塞
    return "阻塞任务完成"

async def main():
    loop = asyncio.get_running_loop()
    result = await loop.run_in_executor(None, blocking_task)  # 扔到默认线程池
    print(result)

asyncio.run(main())

输出:

(等2秒)
阻塞任务完成
  • runinexecutor(None, 函数):把同步函数扔到线程池跑,不卡主循环。

总结:高级玩法的精髓

  • 事件循环:手动控制任务调度,混合同步代码。
  • 上下文管理器:优雅处理异步资源的初始化和清理。
  • 任务取消与信号:让程序能随时停下来,响应外部变化。
  • 异步迭代器/生成器:处理流式数据,边等边用。
  • 线程混用:兼容阻塞代码,不让它拖后腿。

这些用法适合复杂场景,比如写服务器(用 asyncio 搭个 TCP/UDP 服务)、爬虫(配合 aiohttp)、实时系统(处理 WebSocket)。

threading

Python 的 threading 模块,从啥都不懂的初级到能玩转多线程的高级用法。threading 是 Python 用来处理多线程的工具,简单说就是让程序同时干多件事儿,比如一边下载文件一边播放音乐。

初级:啥是线程?怎么跑起来?

1. 理解线程:多干点活儿

想象你在饭店点餐:普通程序(单线程)就像服务员只盯着你,等你吃完再去招呼别人;多线程呢,就像饭店有好几个服务员,同时招呼好几桌客人。threading 就是帮你“雇”几个服务员,让程序并行干活。

2. 最简单的起步:跑个线程

Python 自带 threading 模块,不用装啥,直接开干。咱先写个最简单的多线程代码:

import threading
import time

def say_hello():
    print("Hello")
    time.sleep(1)  # 假装干活1秒
    print("World")

# 创建线程
t = threading.Thread(target=say_hello)
# 启动线程
t.start()
# 主线程继续干活
print("主线程不等你,我先跑了")

跑一下,输出可能是:

Hello
主线程不等你,我先跑了
(等1秒)
World
  • threading.Thread(target=函数):创建一个线程,告诉它要跑哪个函数。
  • start():启动线程,让它开始干活。
  • 主线程(程序默认的线程)和新线程一起跑,谁先谁后看运气。

3. 等线程干完:join

有时候你想让主线程等等其他线程干完再走,用 join():

import threading
import time

def task():
    print("任务开始")
    time.sleep(2)
    print("任务结束")

t = threading.Thread(target=task)
t.start()
t.join()  # 主线程在这等着
print("线程干完了,我再跑")

输出:

任务开始
(等2秒)
任务结束
线程干完了,我再跑
  • join():让主线程等着,直到这个线程干完。

4. 多线程一起跑

再加几个线程试试:

import threading
import time

def worker(name):
    print(f"工人 {name} 开始干活")
    time.sleep(1)
    print(f"工人 {name} 干完了")

threads = []
for i in range(3):
    t = threading.Thread(target=worker, args=(i,))
    threads.append(t)
    t.start()

for t in threads:
    t.join()  # 等所有线程干完

print("所有工人下班了")

输出可能是:

工人 0 开始干活
工人 1 开始干活
工人 2 开始干活
(等1秒)
工人 0 干完了
工人 1 干完了
工人 2 干完了
所有工人下班了
  • args=(i,):给线程函数传参数。
  • 多个线程几乎同时启动,总耗时是1秒,而不是3秒(1+1+1)。

中级:整点实用的活儿

1. 线程安全的共享数据:锁

多线程一起跑有个坑:如果大家同时改同一块数据,可能乱套。比如计数器:

import threading

counter = 0

def add():
    global counter
    for _ in range(100000):
        counter += 1

t1 = threading.Thread(target=add)
t2 = threading.Thread(target=add)
t1.start()
t2.start()
t1.join()
t2.join()
print("计数器结果:", counter)

你可能期待结果是 200000(两个线程各加 100000),但实际可能是 180000 之类的。为什么?因为线程抢着改 counter,会“撞车”。

解决办法:加锁

import threading

counter = 0
lock = threading.Lock()  # 创建锁

def add():
    global counter
    for _ in range(100000):
        with lock:  # 上锁
            counter += 1

t1 = threading.Thread(target=add)
t2 = threading.Thread(target=add)
t1.start()
t2.start()
t1.join()
t2.join()
print("计数器结果:", counter)

输出:

计数器结果:200000
  • threading.Lock():创建一个锁。
  • with lock

threading 模块里的 Event

Event 是多线程编程里的一个“信号灯”,用来让线程之间互相“喊话”,协调谁干活、谁等着。

啥是 Event?

想象你在组织一个饭局,大家都到了才能开吃。Event 就像饭店老板娘,等所有人到齐,她喊一声“开饭啦”,大家才动筷子。在线程里,Event 是一个信号工具:

  • 默认是“红灯”(未设置状态,False),线程看到红灯就等着。
  • 有人把信号设成“绿灯”(True),等着的人就知道可以干活了。

基础用法:红灯绿灯

  1. 创建和简单使用
import threading
import time

# 创建一个 Event 对象
event = threading.Event()

def waiter():
    print("小弟等着老板喊开工")
    event.wait()  # 等绿灯
    print("老板喊了,小弟开干!")

def boss():
    print("老板先忙别的")
    time.sleep(2)  # 假装忙2秒
    print("老板喊:开工啦!")
    event.set()  # 红灯变绿灯

# 创建线程
t1 = threading.Thread(target=waiter)
t2 = threading.Thread(target=boss)
t1.start()
t2.start()
t1.join()
t2.join()

输出:

小弟等着老板喊开工
老板先忙别的
(等2秒)
老板喊:开工啦!
小弟开干!
  • threading.Event():创建一个信号对象,默认是红灯(False)。
  • event.wait():线程在这等着,直到信号变绿(True)。
  • event.set():把信号设成绿灯,通知所有等着的人。
  1. 检查信号状态

可以用 is_set() 看看当前是红灯还是绿灯:

import threading

event = threading.Event()
print("信号状态:", event.is_set())  # False,红灯
event.set()
print("信号状态:", event.is_set())  # True,绿灯

输出:

信号状态:False
信号状态:True
  1. 关掉绿灯:clear

信号变绿后,可以用 clear() 把它变回红灯:

import threading
import time

event = threading.Event()

def worker():
    print("工人等着信号")
    event.wait()
    print("信号绿了,干活啦")
    time.sleep(1)
    print("活干完了")

t = threading.Thread(target=worker)
t.start()
time.sleep(1)
print("老板点绿灯")
event.set()  # 绿灯
time.sleep(2)
print("老板关绿灯")
event.clear()  # 变回红灯
print("信号状态:", event.is_set())
t.join()

输出:

工人等着信号
(等1秒)
老板点绿灯
信号绿了,干活啦
(等1秒)
活干完了
老板关绿灯
信号状态:False
  • clear():把绿灯变回红灯,适合重复使用的场景。

实用场景:协调多线程

  1. 等所有人准备好再开工

比如三个工人得等老板一声令下一起干活:

import threading
import time

event = threading.Event()

def worker(name):
    print(f"工人 {name} 准备好了")
    event.wait()  # 等信号
    print(f"工人 {name} 开始干活")

# 创建3个工人线程
threads = []
for i in range(3):
    t = threading.Thread(target=worker, args=(i,))
    threads.append(t)
    t.start()

time.sleep(1)  # 假装老板磨蹭一下
print("老板喊:全体开工!")
event.set()  # 绿灯,所有人一起干

for t in threads:
    t.join()

输出:

工人 0 准备好了
工人 1 准备好了
工人 2 准备好了
(等1秒)
老板喊:全体开工!
工人 0 开始干活
工人 1 开始干活
工人 2 开始干活
  • 一个 Event 可以控制多个线程,信号一变,所有等着的人都动。
  1. 加个超时:别傻等 有时候不想无限等,可以给 wait() 加个超时:
import threading
import time

event = threading.Event()

def lazy_worker():
    print("懒工人等着信号")
    if event.wait(timeout=2):  # 最多等2秒
        print("信号来了,干活")
    else:
        print("等烦了,不干了")

t = threading.Thread(target=lazy_worker)
t.start()
time.sleep(3)  # 老板故意拖延
print("老板姗姗来迟")
event.set()
t.join()

输出:

懒工人等着信号
(等2秒)
等烦了,不干了
(再等1秒)
老板姗姗来迟
  • wait(timeout=秒数):等够时间没绿灯就返回 False,继续往下走。

高级用法:生产者-消费者模式

Event 在生产者和消费者场景里很常见,比如一个线程生产数据,另一个消费数据:

import threading
import time
import random

event = threading.Event()
data = None

def producer():
    global data
    for i in range(3):
        time.sleep(1)
        data = f"货物 {i}"
        print(f"生产者生产了 {data}")
        event.set()  # 通知消费者
        event.wait()  # 等消费者拿走
        event.clear()  # 重置信号

def consumer():
    for _ in range(3):
        event.wait()  # 等生产者信号
        print(f"消费者拿到了 {data}")
        time.sleep(random.uniform(0.5, 1))  # 模拟消费时间
        event.set()  # 通知生产者我拿完了
        event.clear()  # 重置信号

t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer)
t1.start()
t2.start()
t1.join()
t2.join()

输出(时间随机):

生产者生产了 货物 0
消费者拿到了 货物 0
生产者生产了 货物 1
消费者拿到了 货物 1
生产者生产了 货物 2
消费者拿到了 货物 2
  • 生产者用 set() 通知有货,消费者用 set() 通知拿完了。
  • 用 clear() 重置信号,确保每次交易干净利落。

Event 的核心

  • 基础:wait() 等信号,set() 发信号,clear() 撤信号,像红绿灯。
  • 实用:控制多个线程同步开工,或者加超时避免死等。
  • 高级:在生产者-消费者模式里当“传话筒”,协调线程间节奏。

Event 的牛逼之处在于简单又灵活,适合线程间需要“喊一嗓子”的场景。比起锁(Lock)复杂点,但比队列(Queue)轻量。

啥是 runcoroutinethreadsafe?

想象你在一个多线程程序里跑着,主线程用 threading 干活,但你还想偷偷塞点异步任务(asyncio 的协程)进去。问题来了:asyncio 的东西得在事件循环里跑,而线程和事件循环不是一个世界,怎么办?runcoroutinethreadsafe 就是个“跨界送信员”,帮你从线程里安全地把协程扔到 asyncio 的事件循环里去执行。

简单说:它是从线程里调用异步代码的桥梁。

怎么用? 基本用法 你得先有个跑着的事件循环,然后从线程里用这个函数提交任务。

import asyncio
import threading
import time

# 异步任务
async def say_hello():
    print("Hello from asyncio")
    await asyncio.sleep(1)
    print("Goodbye from asyncio")

# 线程里跑的函数
def thread_task(loop):
    print("线程开始干活")
    # 从线程提交协程到事件循环
    future = asyncio.run_coroutine_threadsafe(say_hello(), loop)
    # 等结果(可选)
    result = future.result()  # 阻塞等协程跑完
    print("线程拿到结果:", result)

# 主函数:启动事件循环
def main():
    loop = asyncio.get_event_loop()
    # 在线程里跑
    t = threading.Thread(target=thread_task, args=(loop,))
    t.start()
    # 主线程跑事件循环
    loop.run_forever()
    t.join()

if __name__ == "__main__":
    main()

输出:

线程开始干活
Hello from asyncio
(等1秒)
Goodbye from asyncio
线程拿到结果: None

关键点 1. asyncio.runcoroutinethreadsafe(协程, 事件循环): * 第一个参数是你的异步函数(协程)。 * 第二个参数是目标事件循环(得是已经跑起来的)。 * 返回一个 concurrent.futures.Future 对象,可以用 .result() 拿结果。 2. 事件循环得活着: * loop.run_forever() 让事件循环一直跑,不然协程没机会执行。 * 如果用 asyncio.run(),它会自动关循环,不适合这种场景。 3. 线程安全: * 这个函数专门设计来避免线程和 asyncio 的冲突,直接扔任务不用操心。

啥时候用? 你有个多线程程序,但某些任务想用 asyncio 的异步库(比如 aiohttp 发请求)。 主程序跑线程,但想“借用”事件循环跑点异步活儿。 比如:线程下载文件,异步上传结果。

简单例子:线程+异步混搭

import asyncio
import threading
import time

async def async_task():
    await asyncio.sleep(1)
    return "异步任务完成"

def thread_worker(loop):
    print("线程喊:干点异步活儿")
    future = asyncio.run_coroutine_threadsafe(async_task(), loop)
    result = future.result()
    print("线程收到:", result)

loop = asyncio.new_event_loop()
t = threading.Thread(target=thread_worker, args=(loop,))
t.start()

# 主线程跑循环
asyncio.set_event_loop(loop)
loop.run_forever()

输出:

线程喊:干点异步活儿
(等1秒)
线程收到: 异步任务完成

总结

  • 作用:从线程里安全跑 asyncio 协程。
  • 用法:runcoroutinethreadsafe(协程, loop),扔任务给事件循环。
  • 场景:线程和异步混用时。

简单吧?就是个“跨界工具”,让线程和 asyncio 和平共处。

文档信息

版权声明:可自由转载(请注明转载出处)-非商用-非衍生

发表时间:2025年3月2日 16:16