101人参与 • 2026-05-15 • Python
asyncio.queue 是 python 标准库 asyncio 模块中的异步队列实现,位于 lib/asyncio/queues.py。它的设计思路与线程安全的 queue.queue 类似,但专门为 async/await 协程模型 设计,不是线程安全的。
核心定位:在异步并发环境下,作为生产者-消费者模式的协调桥梁,实现协程之间的数据传递与流量控制。
import asyncio queue = asyncio.queue(maxsize=10) # 有界队列,容量10 queue = asyncio.queue() # 无界队列(maxsize=0)
从 cpython 源码来看,asyncio.queue 的核心数据结构非常精巧:
class queue(mixins._loopboundmixin):
def __init__(self, maxsize=0):
self._maxsize = maxsize
self._getters = collections.deque() # 等待获取的 future 队列
self._putters = collections.deque() # 等待放入的 future 队列
self._unfinished_tasks = 0 # 未完成任务计数
self._finished = locks.event() # join() 阻塞用的事件
self._finished.set()
self._init(maxsize)
self._is_shutdown = false
| 内部属性 | 作用 |
|---|---|
| _queue | 实际存储数据的 collections.deque(fifo) |
| _getters | 当队列为空时,get() 的调用者会创建一个 future 并挂入此 deque 等待 |
| _putters | 当队列已满时,put() 的调用者会创建一个 future 并挂入此 deque 等待 |
| _unfinished_tasks | 配合 task_done() / join() 实现任务追踪 |
| _finished | 一个 asyncio.event,当未完成任务归零时被 set,唤醒 join() |
put() 和 get() 的阻塞并非真正的线程阻塞,而是通过 future + await 实现的协程挂起:
# put() 核心逻辑(简化)
async def put(self, item):
while self.full():
putter = self._get_loop().create_future()
self._putters.append(putter)
await putter # 协程在此挂起
self.put_nowait(item)
# get() 核心逻辑(简化)
async def get(self):
while self.empty():
getter = self._get_loop().create_future()
self._getters.append(getter)
await getter # 协程在此挂起
return self.get_nowait()
当对面操作发生时,通过 _wakeup_next() 唤醒等待者:
def _wakeup_next(self, waiters):
while waiters:
waiter = waiters.popleft()
if not waiter.done():
waiter.set_result(none) # 唤醒一个等待的协程
break
这种 逐个唤醒(one-by-one wakeup) 设计避免了"惊群效应",保证公平性:先等待的协程先被唤醒。
asyncio.queue(maxsize=0)
maxsize <= 0:无界队列,put() 永远不会阻塞(内存允许的前提下)maxsize > 0:有界队列,队列满时 put() 会挂起等待将元素放入队列。如果队列已满,挂起当前协程直到有空间。
非阻塞放入。队列满时立即抛出 queuefull 异常。
从队列取出并返回一个元素。如果队列为空,挂起当前协程直到有元素可用。
非阻塞获取。队列空时立即抛出 queueempty 异常。
标记一个之前通过 get() 获取的任务已完成。内部将 _unfinished_tasks 减 1。当计数归零时,触发 _finished 事件,解除 join() 的阻塞。
注意:调用次数超过 put() 的次数会抛出 valueerror。
阻塞直到队列中所有任务都被处理完毕(每个 put 进去的任务都收到了对应的 task_done() 调用)。
# 典型用法 await queue.join() # 等待所有任务完成
| 方法 | 说明 |
|---|---|
| qsize() | 返回队列当前元素数量 |
| empty() | 队列是否为空 |
| full() | 队列是否已满(maxsize=0 时永远返回 false) |
| maxsize | 属性,返回队列容量上限 |
与 threading.queue 不同,由于单线程事件循环的特性,qsize() 的返回值在 await 之前是可靠的——不会被其他线程中断。
queue.shutdown() # 优雅关闭:允许消费完剩余元素 queue.shutdown(immediate=true) # 立即关闭:清空队列,中断所有等待
asyncio 提供了三种队列,它们通过重写 _init、_get、_put 三个内部方法实现不同的出队策略:
def _init(self, maxsize):
self._queue = collections.deque()
def _put(self, item):
self._queue.append(item)
def _get(self):
return self._queue.popleft()
def _init(self, maxsize):
self._queue = []
def _put(self, item, heappush=heapq.heappush):
heappush(self._queue, item)
def _get(self, heappop=heapq.heappop):
return heappop(self._queue)
元素通常为 (priority, data) 元组,数值越小优先级越高。
def _init(self, maxsize):
self._queue = []
def _put(self, item):
self._queue.append(item)
def _get(self):
return self._queue.pop() # 从尾部取出
| 异常 | 触发时机 |
|---|---|
| asyncio.queueempty | get_nowait() 在空队列上调用 |
| asyncio.queuefull | put_nowait() 在满队列上调用 |
| asyncio.queueshutdown | 在已 shutdown() 的队列上调用 put() 或 get()(3.13+) |
import asyncio
import random
async def producer(queue: asyncio.queue, name: str):
for i in range(5):
item = f"{name}-item-{i}"
await queue.put(item)
print(f"[{name}] 生产: {item}")
await asyncio.sleep(random.uniform(0.1, 0.5))
async def consumer(queue: asyncio.queue, name: str):
while true:
item = await queue.get()
print(f" [{name}] 消费: {item}")
await asyncio.sleep(random.uniform(0.2, 0.8))
queue.task_done()
async def main():
queue = asyncio.queue(maxsize=3) # 有界队列,产生背压
# 启动生产者和消费者
producers = [asyncio.create_task(producer(queue, f"p{i}")) for i in range(2)]
consumers = [asyncio.create_task(consumer(queue, f"c{i}")) for i in range(3)]
# 等待所有生产完成
await asyncio.gather(*producers)
# 等待队列中所有任务被消费完
await queue.join()
# 取消消费者(它们在 await queue.get() 处无限等待)
for c in consumers:
c.cancel()
asyncio.run(main())
asyncio.queue 的方法本身不接受 timeout 参数,需要配合 asyncio.wait_for() 使用:
try:
item = await asyncio.wait_for(queue.get(), timeout=5.0)
except asyncio.timeouterror:
print("等待超时,队列中没有新数据")
try:
await asyncio.wait_for(queue.put(item), timeout=3.0)
except asyncio.timeouterror:
print("队列已满,放入超时")
asyncio.queue 不是线程安全的。 如果你需要从非 asyncio 线程向队列中投递任务,必须使用 loop.call_soon_threadsafe():
# 从其他线程安全地放入数据 loop.call_soon_threadsafe(queue.put_nowait, item)
或者使用 asyncio.run_coroutine_threadsafe() 调用异步方法:
future = asyncio.run_coroutine_threadsafe(queue.put(item), loop) future.result() # 阻塞等待完成
| 特性 | queue.queue | asyncio.queue |
|---|---|---|
| 线程安全 | 是(内置锁) | 否(单线程事件循环) |
| 阻塞方式 | 线程阻塞(真正挂起 os 线程) | 协程挂起(让出事件循环) |
| timeout 参数 | get(timeout=5) | 需配合 asyncio.wait_for() |
| qsize() 可靠性 | 不可靠(多线程竞争) | 在 await 点之间可靠 |
| join() / task_done() | 支持 | 支持 |
| shutdown() | 3.13+ 支持 | 3.13+ 支持 |
| 适用场景 | 多线程 | asyncio 协程 |
async def main():
queue = asyncio.queue()
# ... 启动 workers ...
# 生产完毕后,优雅关闭队列
queue.shutdown()
await queue.join()
sentinel = object()
async def consumer(queue):
while true:
item = await queue.get()
if item is sentinel:
queue.task_done()
break
process(item)
queue.task_done()
# 向每个 consumer 发送一个哨兵
for _ in range(num_consumers):
await queue.put(sentinel)
await queue.join()
async def pipeline():
raw_queue = asyncio.queue()
processed_queue = asyncio.queue()
# stage 1: 多个 fetcher 往 raw_queue 放数据
fetchers = [asyncio.create_task(fetch(raw_queue)) for _ in range(5)]
# stage 2: 多个 processor 从 raw_queue 取出,处理后放入 processed_queue
processors = [asyncio.create_task(process(raw_queue, processed_queue)) for _ in range(3)]
# stage 3: 单个 writer 从 processed_queue 取出并写入
writer = asyncio.create_task(write(processed_queue))
maxsize=0 时生产速度远大于消费速度会导致内存暴涨,生产环境应始终设置合理的 maxsizetask_done():会导致 join() 永远阻塞task_done() 仍被调用(用 try/finally):async def safe_consumer(queue):
while true:
item = await queue.get()
try:
await process(item)
finally:
queue.task_done()
call_soon_threadsafe 或 run_coroutine_threadsafeasyncio.queue 是异步编程中不可或缺的协调原语。它的设计简洁优雅——底层通过 future 实现挂起/唤醒,通过 deque 保证 fifo 公平性,通过 _unfinished_tasks + event 实现 join 语义。理解它的内部机制,能帮助你在构建异步任务调度器、流水线处理、背压控制等场景中做出更好的架构决策。
到此这篇关于python中asyncio.queue异步队列的实现的文章就介绍到这了,更多相关python asyncio.queue异步队列内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
您想发表意见!!点此发布评论
版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。
发表评论