佇列 (Queues)¶
asyncio 佇列被設計成與 queue
模組類似。儘管 asyncio 佇列不支援執行緒安全 (thread-safe),但他們是被設計來專用於 async/await 程式。
注意 asyncio 的佇列沒有 timeout 參數;請使用 asyncio.wait_for()
函式來為佇列新增具有超時 (timeout) 設定的操作。
另請參閱下方 Examples。
Queue¶
- class asyncio.Queue(maxsize=0)¶
先進先出 (FIFO) 佇列。
如果 maxsize 小於或等於零,則佇列大小是無限制的。如果是大於
0
的整數,則當佇列達到 maxsize 時,await put()
將會阻塞 (block),直到某個元素被get()
取出。不像標準函式庫中執行緒類型的
queue
,佇列的大小一直是已知的,可以透過呼叫qsize()
方法回傳。在 3.10 版的變更: 移除 loop 參數。
這個類別是不支援執行緒安全的。
- maxsize¶
佇列中可存放的元素數量。
- empty()¶
如果佇列為空則回傳
True
,否則回傳False
。
- async get()¶
從佇列中刪除並回傳一個元素。如果佇列為空,則持續等待直到佇列中有元素。
Raises
QueueShutDown
if the queue has been shut down and is empty, or if the queue has been shut down immediately.
- get_nowait()¶
如果佇列內有值則立即回傳佇列中的元素,否則引發
QueueEmpty
。
- async join()¶
持續阻塞直到佇列中所有的元素都被接收和處理完畢。
當條目新增到佇列的時候,未完成任務的計數就會增加。每當一個消耗者 (consumer) 協程呼叫
task_done()
,表示這個條目已經被取回且被它包含的所有工作都已完成,未完成任務計數就會減少。當未完成計數降到零的時候,join()
阻塞會被解除 (unblock)。
- async put(item)¶
將一個元素放進佇列。如果佇列滿了,在新增元素之前,會持續等待直到有空閒插槽 (free slot) 能被使用。
如果佇列已經被關閉,則引發
QueueShutDown
。
- qsize()¶
回傳佇列中的元素數量。
- shutdown(immediate=False)¶
Put a
Queue
instance into a shutdown mode.The queue can no longer grow. Future calls to
put()
raiseQueueShutDown
. Currently blocked callers ofput()
will be unblocked and will raiseQueueShutDown
in the formerly blocked thread.If immediate is false (the default), the queue can be wound down normally with
get()
calls to extract tasks that have already been loaded.And if
task_done()
is called for each remaining task, a pendingjoin()
will be unblocked normally.Once the queue is empty, future calls to
get()
will raiseQueueShutDown
.If immediate is true, the queue is terminated immediately. The queue is drained to be completely empty and the count of unfinished tasks is reduced by the number of tasks drained. If unfinished tasks is zero, callers of
join()
are unblocked. Also, blocked callers ofget()
are unblocked and will raiseQueueShutDown
because the queue is empty.Use caution when using
join()
with immediate set to true. This unblocks the join even when no work has been done on the tasks, violating the usual invariant for joining a queue.在 3.13 版被加入.
- task_done()¶
表示前面一個排隊的工作項目已經完成。
由佇列消耗者使用。對於每個用於取得一個工作項目的
get()
,接續的task_done()
呼叫會告訴佇列這個工作項目的處理已經完成。如果
join()
目前正在阻塞,在所有項目都被處理後會解除阻塞(意味著每個以put()
放進佇列的條目都會收到一個task_done()
)。如果被呼叫的次數多於放入佇列中的項目數量,將引發
ValueError
。
Priority Queue(優先佇列)¶
LIFO Queue¶
例外¶
- exception asyncio.QueueEmpty¶
當佇列為空的時候,呼叫
get_nowait()
方法會引發這個例外。
- exception asyncio.QueueFull¶
當佇列中條目數量已經達到它的 maxsize 時,呼叫
put_nowait()
方法會引發這個例外。
範例¶
佇列能被用於多個並行任務的工作分配:
import asyncio
import random
import time
async def worker(name, queue):
while True:
# Get a "work item" out of the queue.
sleep_for = await queue.get()
# Sleep for the "sleep_for" seconds.
await asyncio.sleep(sleep_for)
# Notify the queue that the "work item" has been processed.
queue.task_done()
print(f'{name} has slept for {sleep_for:.2f} seconds')
async def main():
# Create a queue that we will use to store our "workload".
queue = asyncio.Queue()
# Generate random timings and put them into the queue.
total_sleep_time = 0
for _ in range(20):
sleep_for = random.uniform(0.05, 1.0)
total_sleep_time += sleep_for
queue.put_nowait(sleep_for)
# Create three worker tasks to process the queue concurrently.
tasks = []
for i in range(3):
task = asyncio.create_task(worker(f'worker-{i}', queue))
tasks.append(task)
# Wait until the queue is fully processed.
started_at = time.monotonic()
await queue.join()
total_slept_for = time.monotonic() - started_at
# Cancel our worker tasks.
for task in tasks:
task.cancel()
# Wait until all worker tasks are cancelled.
await asyncio.gather(*tasks, return_exceptions=True)
print('====')
print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
print(f'total expected sleep time: {total_sleep_time:.2f} seconds')
asyncio.run(main())