9人参与 • 2025-12-10 • Python
asyncio 是 python 3.4+ 引入的标准库,用于编写并发代码的异步 i/o 框架。它使用事件循环和协程实现单线程并发,特别适合处理 i/o 密集型任务。
核心优势:
使用 async def 定义的特殊函数,可以在执行过程中暂停和恢复。
async def my_coroutine():
print("开始执行")
await asyncio.sleep(1)
print("执行完成")
asyncio 的核心,负责调度和执行协程。
用于等待异步操作完成,只能在 async 函数内使用。
对协程的封装,可以并发执行多个协程。
表示一个异步操作的最终结果。
import asyncio
async def hello():
print("hello")
await asyncio.sleep(1)
print("world")
# 方式1: python 3.7+ 推荐
asyncio.run(hello())
# 方式2: 手动管理事件循环
loop = asyncio.get_event_loop()
loop.run_until_complete(hello())
loop.close()
# 方式3: 在已有事件循环中
# await hello() # 只能在异步函数内使用
async def task1():
await asyncio.sleep(2)
return "任务1完成"
async def task2():
await asyncio.sleep(1)
return "任务2完成"
async def main():
# 创建任务
t1 = asyncio.create_task(task1())
t2 = asyncio.create_task(task2())
# 等待所有任务完成
results = await asyncio.gather(t1, t2)
print(results)
asyncio.run(main())
async def fetch_data(n):
print(f"开始获取数据 {n}")
await asyncio.sleep(1)
print(f"完成获取数据 {n}")
return f"数据 {n}"
async def main():
# gather: 并发执行,按顺序返回结果
results = await asyncio.gather(
fetch_data(1),
fetch_data(2),
fetch_data(3)
)
print(results)
asyncio.run(main())
┌─────────────────────────────────┐ │ 事件循环 (event loop) │ │ │ │ ┌──────────────────────────┐ │ │ │ 就绪队列 (ready queue) │ │ │ │ [task1, task2, task3] │ │ │ └──────────────────────────┘ │ │ │ │ ┌──────────────────────────┐ │ │ │ 等待队列 (wait queue) │ │ │ │ [task4, task5] │ │ │ └──────────────────────────┘ │ │ │ │ ┌──────────────────────────┐ │ │ │ i/o 选择器 │ │ │ │ (epoll/kqueue/select) │ │ │ └──────────────────────────┘ │ └─────────────────────────────────┘
async def example():
print("1. 开始执行") # running
await asyncio.sleep(1) # waiting (挂起)
print("2. 继续执行") # running (恢复)
return "完成" # finished
# 简化的事件循环伪代码
class eventloop:
def __init__(self):
self._ready = deque() # 就绪队列
self._selector = select.epoll() # i/o 选择器
def run_until_complete(self, coro):
task = task(coro)
self._ready.append(task)
while self._ready or self._has_pending_io():
# 执行就绪的任务
if self._ready:
task = self._ready.popleft()
task.step()
# 等待 i/o 事件
events = self._selector.select(timeout=0)
for event in events:
self._ready.append(event.callback)
| 特性 | asyncio | 多线程 (threading) | 多进程 (multiprocessing) |
|---|---|---|---|
| 并发模型 | 协作式并发 | 抢占式并发 | 真正并行 |
| 运行环境 | 单线程 | 多线程 | 多进程 |
| gil 影响 | 无影响 | 受 gil 限制 | 不受 gil 限制 |
| 切换开销 | 极小(用户态) | 较大(内核态) | 最大(进程切换) |
| 内存占用 | 低 | 中等 | 高 |
| 适用场景 | i/o 密集型 | i/o 密集型 | cpu 密集型 |
| 数据共享 | 简单(同一线程) | 需要锁 | 需要 ipc |
| 调试难度 | 容易 | 困难 | 中等 |
# 典型的 asyncio 场景: 并发网络请求
import aiohttp
import asyncio
async def fetch_url(session, url):
async with session.get(url) as response:
return await response.text()
async def main():
urls = ['http://example.com'] * 100
async with aiohttp.clientsession() as session:
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
print(f"完成 {len(results)} 个请求")
asyncio.run(main())
import threading
import requests
def fetch_url(url):
response = requests.get(url)
print(f"完成: {url}")
urls = ['http://example.com'] * 10
threads = [threading.thread(target=fetch_url, args=(url,)) for url in urls]
for t in threads:
t.start()
for t in threads:
t.join()
from multiprocessing import pool
def cpu_intensive(n):
return sum(i*i for i in range(n))
with pool(4) as pool:
results = pool.map(cpu_intensive, [10000000] * 4)
print(results)
import asyncio
import threading
import multiprocessing
import time
# i/o 密集型任务
def io_bound_sync():
time.sleep(1)
async def io_bound_async():
await asyncio.sleep(1)
# 测试 asyncio
async def test_asyncio():
start = time.time()
await asyncio.gather(*[io_bound_async() for _ in range(100)])
print(f"asyncio: {time.time() - start:.2f}s") # 约 1 秒
# 测试多线程
def test_threading():
start = time.time()
threads = [threading.thread(target=io_bound_sync) for _ in range(100)]
for t in threads: t.start()
for t in threads: t.join()
print(f"threading: {time.time() - start:.2f}s") # 约 1-2 秒
# asyncio 在 i/o 密集型任务中表现最优
class asyncresource:
async def __aenter__(self):
print("获取资源")
await asyncio.sleep(1)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("释放资源")
await asyncio.sleep(1)
async def main():
async with asyncresource() as resource:
print("使用资源")
asyncio.run(main())
class asyncrange:
def __init__(self, start, end):
self.current = start
self.end = end
def __aiter__(self):
return self
async def __anext__(self):
if self.current >= self.end:
raise stopasynciteration
await asyncio.sleep(0.1)
self.current += 1
return self.current - 1
async def main():
async for i in asyncrange(0, 5):
print(i)
asyncio.run(main())
async def async_generator():
for i in range(5):
await asyncio.sleep(1)
yield i
async def main():
async for value in async_generator():
print(value)
asyncio.run(main())
async def slow_operation():
await asyncio.sleep(5)
return "完成"
async def main():
try:
# 设置 2 秒超时
result = await asyncio.wait_for(slow_operation(), timeout=2)
except asyncio.timeouterror:
print("操作超时")
asyncio.run(main())
# 限制并发数量
async def limited_task(sem, n):
async with sem:
print(f"任务 {n} 开始")
await asyncio.sleep(1)
print(f"任务 {n} 完成")
async def main():
sem = asyncio.semaphore(3) # 最多 3 个并发
await asyncio.gather(*[limited_task(sem, i) for i in range(10)])
asyncio.run(main())
async def producer(queue, n):
for i in range(n):
await queue.put(i)
print(f"生产: {i}")
await asyncio.sleep(0.5)
async def consumer(queue, name):
while true:
item = await queue.get()
print(f"{name} 消费: {item}")
await asyncio.sleep(1)
queue.task_done()
async def main():
queue = asyncio.queue()
# 创建生产者和消费者
producers = [asyncio.create_task(producer(queue, 5))]
consumers = [asyncio.create_task(consumer(queue, f"消费者{i}"))
for i in range(2)]
await asyncio.gather(*producers)
await queue.join() # 等待所有任务处理完成
for c in consumers:
c.cancel()
asyncio.run(main())
import asyncio
import aiohttp
from bs4 import beautifulsoup
async def fetch_page(session, url):
try:
async with session.get(url, timeout=10) as response:
return await response.text()
except exception as e:
print(f"错误 {url}: {e}")
return none
async def parse_page(html):
if html:
soup = beautifulsoup(html, 'html.parser')
return soup.title.string if soup.title else "无标题"
return none
async def crawl_urls(urls):
async with aiohttp.clientsession() as session:
tasks = [fetch_page(session, url) for url in urls]
pages = await asyncio.gather(*tasks)
titles = await asyncio.gather(*[parse_page(page) for page in pages])
return titles
urls = [
'http://example.com',
'http://example.org',
'http://example.net',
]
# 执行爬虫
# titles = asyncio.run(crawl_urls(urls))
# print(titles)
import asyncio
import aiosqlite
async def create_table(db):
await db.execute('''
create table if not exists users (
id integer primary key,
name text,
email text
)
''')
await db.commit()
async def insert_user(db, name, email):
await db.execute(
'insert into users (name, email) values (?, ?)',
(name, email)
)
await db.commit()
async def fetch_users(db):
async with db.execute('select * from users') as cursor:
return await cursor.fetchall()
async def main():
async with aiosqlite.connect('test.db') as db:
await create_table(db)
# 并发插入
await asyncio.gather(
insert_user(db, 'alice', 'alice@example.com'),
insert_user(db, 'bob', 'bob@example.com'),
insert_user(db, 'charlie', 'charlie@example.com')
)
users = await fetch_users(db)
for user in users:
print(user)
# asyncio.run(main())
import asyncio
async def handle_client(reader, writer):
data = await reader.read(1024)
message = data.decode()
addr = writer.get_extra_info('peername')
print(f"收到来自 {addr} 的消息: {message}")
response = f"echo: {message}"
writer.write(response.encode())
await writer.drain()
writer.close()
await writer.wait_closed()
async def start_server():
server = await asyncio.start_server(
handle_client, '127.0.0.1', 8888
)
addr = server.sockets[0].getsockname()
print(f"服务器启动在 {addr}")
async with server:
await server.serve_forever()
# asyncio.run(start_server())
import asyncio
import random
async def data_stream():
"""模拟数据流"""
while true:
yield random.randint(1, 100)
await asyncio.sleep(0.5)
async def process_data(value):
"""处理数据"""
await asyncio.sleep(0.1)
return value * 2
async def monitor_stream():
"""监控和处理数据流"""
buffer = []
async for data in data_stream():
print(f"接收数据: {data}")
# 异步处理数据
task = asyncio.create_task(process_data(data))
buffer.append(task)
# 每 5 个数据批量处理
if len(buffer) >= 5:
results = await asyncio.gather(*buffer)
print(f"处理结果: {results}")
buffer = []
# 演示用,处理 20 个数据后停止
if data > 20:
break
# asyncio.run(monitor_stream())
import asyncio
async def task_with_priority(name, priority, duration):
print(f"[优先级 {priority}] {name} 开始")
await asyncio.sleep(duration)
print(f"[优先级 {priority}] {name} 完成")
return f"{name} 结果"
async def coordinator():
# 创建不同优先级的任务
high_priority = [
task_with_priority(f"高优先级-{i}", 1, 1)
for i in range(3)
]
low_priority = [
task_with_priority(f"低优先级-{i}", 3, 2)
for i in range(3)
]
# 先执行高优先级任务
high_results = await asyncio.gather(*high_priority)
print(f"高优先级完成: {high_results}")
# 再执行低优先级任务
low_results = await asyncio.gather(*low_priority)
print(f"低优先级完成: {low_results}")
asyncio.run(coordinator())
async def safe_task(n):
try:
if n == 3:
raise valueerror("错误的值")
await asyncio.sleep(1)
return f"任务 {n} 成功"
except exception as e:
print(f"任务 {n} 失败: {e}")
return none
async def main():
results = await asyncio.gather(
safe_task(1),
safe_task(2),
safe_task(3),
return_exceptions=true # 不会因为一个任务失败而停止
)
print(results)
asyncio.run(main())
class asyncconnection:
async def __aenter__(self):
self.conn = await self.connect()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.close()
async def connect(self):
print("建立连接")
await asyncio.sleep(1)
return "connection"
async def close(self):
print("关闭连接")
await asyncio.sleep(1)
async def main():
async with asyncconnection() as conn:
print(f"使用连接: {conn.conn}")
asyncio.run(main())
import asyncio
from concurrent.futures import threadpoolexecutor
# 错误: 阻塞操作
async def bad_example():
time.sleep(5) # 这会阻塞整个事件循环!
# 正确: 使用 executor 运行阻塞操作
async def good_example():
loop = asyncio.get_event_loop()
with threadpoolexecutor() as executor:
result = await loop.run_in_executor(executor, blocking_function)
return result
def blocking_function():
import time
time.sleep(5)
return "完成"
async def with_timeout():
try:
result = await asyncio.wait_for(
long_running_task(),
timeout=5.0
)
except asyncio.timeouterror:
print("任务超时,执行回退逻辑")
result = "默认值"
return result
async def main():
async with asyncio.taskgroup() as tg:
task1 = tg.create_task(some_coro())
task2 = tg.create_task(another_coro())
# 所有任务完成或某个任务失败时退出
print("所有任务完成")
✅ 适合使用:
❌ 不适合使用:
结语: asyncio 是 python 异步编程的强大工具,掌握它可以显著提升 i/o 密集型应用的性能。理解其内部机制和最佳实践,能帮助你写出高效、可维护的异步代码。
到此这篇关于python标准库asyncio用法完全指南的文章就介绍到这了,更多相关python asyncio完全指南内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
您想发表意见!!点此发布评论
版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。
发表评论