佇列 (Queues)¶
asyncio 佇列被設計成與 queue
模組類似。儘管 asyncio 佇列不支援執行緒安全 (thread-safe),但他們是被設計來專用於 async/await 程式。
注意 asyncio 的佇列沒有 timeout 參數;請使用 asyncio.wait_for()
函式來為佇列新增具有超時 (timeout) 設定的操作。
另請參閱下方 Examples。
Queue¶
- class asyncio.Queue(maxsize=0)¶
先進先出 (FIFO) 佇列。
如果 maxsize 小於或等於零,則佇列尺寸是無限制的。如果是大於
0
的整數,則當佇列達到 maxsize 時,await put()
將會阻塞 (block),直到某個元素被get()
取出。不像標準函式庫中執行緒類型的
queue
,佇列的尺寸一直是已知的,可以透過呼叫qsize()
方法回傳。在 3.10 版的變更: 移除 loop 參數。
這個類別是不支援執行緒安全的。
- maxsize¶
佇列中可存放的元素數量。
- empty()¶
如果佇列為空則回傳
True
,否則回傳False
。
- coroutine get()¶
從佇列中刪除並回傳一個元素。如果佇列為空,則持續等待直到佇列中有元素。
Raises
QueueShutDown
if the queue has been shut down and is empty, or if the queue has been shut down immediately.
- get_nowait()¶
如果佇列內有值則立即回傳佇列中的元素,否則引發
QueueEmpty
。
- coroutine join()¶
持續阻塞直到佇列中所有的元素都被接收和處理完畢。
當條目新增到佇列的時候,未完成任務的計數就會增加。每當一個消耗者 (consumer) 協程呼叫
task_done()
,表示這個條目已經被取回且被它包含的所有工作都已完成,未完成任務計數就會減少。當未完成計數降到零的時候,join()
阻塞會被解除 (unblock)。
- coroutine put(item)¶
將一個元素放進佇列。如果佇列滿了,在新增元素之前,會持續等待直到有空閒插槽 (free slot) 能被使用。
Raises
QueueShutDown
if the queue has been shut down.
- qsize()¶
回傳佇列中的元素數量。
- shutdown(immediate=False)¶
Shut down the queue, making
get()
andput()
raiseQueueShutDown
.By default,
get()
on a shut down queue will only raise once the queue is empty. Set immediate to true to makeget()
raise immediately instead.All blocked callers of
put()
andget()
will be unblocked. If immediate is true, a task will be marked as done for each remaining item in the queue, which may unblock callers ofjoin()
.在 3.13 版被加入.
- task_done()¶
表示前面一個排隊的任務已經完成。
由佇列消耗者使用。對於每個用於獲取一個任務的
get()
,接續的task_done()
呼叫會告訴佇列這個任務的處理已經完成。如果
join()
當前正在阻塞,在所有項目都被處理後會解除阻塞(意味著每個以put()
放進佇列的條目都會收到一個task_done()
)。shutdown(immediate=True)
callstask_done()
for each remaining item in the queue.如果被呼叫的次數多於放入佇列中的項目數量,將引發
ValueError
。
Priority Queue(優先佇列)¶
LIFO Queue¶
例外¶
- exception asyncio.QueueEmpty¶
當佇列為空的時候,呼叫
get_nowait()
方法會引發這個例外。
- exception asyncio.QueueFull¶
當佇列中條目數量已經達到它的 maxsize 時,呼叫
put_nowait()
方法會引發這個例外。
範例¶
佇列能被用於多個並行任務的工作分配:
import asyncio
import random
import time
async def worker(name, queue):
while True:
# Get a "work item" out of the queue.
sleep_for = await queue.get()
# Sleep for the "sleep_for" seconds.
await asyncio.sleep(sleep_for)
# Notify the queue that the "work item" has been processed.
queue.task_done()
print(f'{name} has slept for {sleep_for:.2f} seconds')
async def main():
# Create a queue that we will use to store our "workload".
queue = asyncio.Queue()
# Generate random timings and put them into the queue.
total_sleep_time = 0
for _ in range(20):
sleep_for = random.uniform(0.05, 1.0)
total_sleep_time += sleep_for
queue.put_nowait(sleep_for)
# Create three worker tasks to process the queue concurrently.
tasks = []
for i in range(3):
task = asyncio.create_task(worker(f'worker-{i}', queue))
tasks.append(task)
# Wait until the queue is fully processed.
started_at = time.monotonic()
await queue.join()
total_slept_for = time.monotonic() - started_at
# Cancel our worker tasks.
for task in tasks:
task.cancel()
# Wait until all worker tasks are cancelled.
await asyncio.gather(*tasks, return_exceptions=True)
print('====')
print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
print(f'total expected sleep time: {total_sleep_time:.2f} seconds')
asyncio.run(main())