queue --- 同步佇列 (synchronized queue) 類別¶
原始碼:Lib/queue.py
queue module(模組)實作多生產者、多消費者佇列。在執行緒程式設計中,必須在多執行緒之間安全地交換資訊時,特別有用。此 module 中的 Queue class 實作所有必需的鎖定語義 (locking semantics)。
此 module 實作三種型別的佇列,它們僅在取出條目的順序上有所不同。在 FIFO 佇列中,先加入的任務是第一個被取出的。在 LIFO 佇列中,最近被加入的條目是第一個被取出的(像堆疊 (stack) 一樣操作)。使用優先佇列 (priority queue) 時,條目將保持排序狀態(使用 heapq module),並先取出最低值條目。
在內部,這三種型別的佇列使用鎖 (lock) 來暫時阻塞競爭執行緒;但是,它們並不是被設計來處理執行緒內的 reentrancy(可重入)。
此外,此 module 實作一個「簡單」的 FIFO 佇列型別 SimpleQueue,其特定的實作是以較少的功能為代價,來提供額外的保證。
queue module 定義了以下的 class 和例外:
- class queue.Queue(maxsize=0)¶
FIFO 佇列的建構子 (constructor)。maxsize 是一個整數,用於設置佇列中可放置的項目數的上限。一旦達到此大小,插入將會阻塞,直到佇列中的項目被消耗。如果 maxsize 小於或等於零,則佇列大小為無限。
- class queue.LifoQueue(maxsize=0)¶
LIFO 佇列的建構子。maxsize 是一個整數,用於設置佇列中可放置的項目數的上限。一旦達到此大小,插入將被鎖定,到佇列中的項目被消耗。如果 maxsize 小於或等於零,則佇列大小為無限。
- class queue.PriorityQueue(maxsize=0)¶
優先佇列的建構子。maxsize 是一個整數,用於設置佇列中可放置的項目數的上限。一旦達到此大小,插入將被阻塞,直到佇列中的項目被消耗。如果 maxsize 小於或等於零,則佇列大小為無限。
最低值的條目會最先被取出(最低值的條目是被會
min(entries)回傳的那一個)。條目的典型模式是格式為(priority_number, data)的 tuple(元組)。如果 data 元素為不可比較的,則可以將資料包裝在一個 class 中,該 class 忽略資料項目並僅比較優先數:
from dataclasses import dataclass, field from typing import Any @dataclass(order=True) class PrioritizedItem: priority: int item: Any=field(compare=False)
- class queue.SimpleQueue¶
無界的 FIFO 佇列的建構子。簡單佇列缺少任務追蹤等進階功能。
在 3.7 版被加入.
- exception queue.Empty¶
當對一個空的
Queue物件呼叫非阻塞的 (non-blocking)get()(或get_nowait())將引發此例外。
- exception queue.Full¶
當對一個已滿的
Queue物件呼叫非阻塞的put()(或put_nowait())將引發此例外。
- exception queue.ShutDown¶
Exception raised when
put()orget()is called on aQueueobject which has been shut down.在 3.13 版被加入.
佇列物件¶
佇列物件(Queue、LifoQueue、PriorityQueue)提供下面描述的公用 method。
- Queue.qsize()¶
回傳佇列的近似大小。注意,qsize() > 0 並不能保證後續的 get() 不會阻塞,qsize() < maxsize 也不會保證 put() 不會阻塞。
- Queue.empty()¶
如果佇列為空,則回傳
True,否則回傳False。如果 empty() 回傳True,則不保證後續呼叫 put() 不會阻塞。同樣,如果 empty() 回傳False,則不保證後續呼叫 get() 不會阻塞。
- Queue.full()¶
如果佇列已滿,則回傳
True,否則回傳False。如果 full() 回傳True,則不保證後續呼叫 get() 不會阻塞。同樣,如果 full() 回傳False,則不保證後續呼叫 put() 不會阻塞。
- Queue.put(item, block=True, timeout=None)¶
將 item 放入佇列中。如果可選的 args block 為 true、timeout 為
None(預設值),則在必要時阻塞,直到自由槽 (free slot) 可用。如果 timeout 為正數,則最多阻塞 timeout 秒,如果該時間內沒有可用的自由槽,則會引發Full例外。否則(block 為 false),如果自由槽立即可用,則將項目放在佇列中,否則引發Full例外(在這種情況下,timeout 將被忽略)。Raises
ShutDownif the queue has been shut down.
- Queue.put_nowait(item)¶
等效於
put(item, block=False)。
- Queue.get(block=True, timeout=None)¶
從佇列中移除並回傳一個項目。如果可選的 args block 為 true,且 timeout 為
None(預設值),則在必要時阻塞,直到有可用的項目。如果 timeout 是正數,則最多會阻塞 timeout 秒,如果該時間內沒有可用的項目,則會引發Empty例外。否則(block 為 false),如果立即可用,則回傳一個項目,否則引發Empty例外(在這種情況下,timeout 將被忽略)。在 POSIX 系統的 3.0 版之前,以及 Windows 的所有版本,如果 block 為 true 且 timeout 為
None,則此操作將在底層鎖上進入不間斷等待。這意味著不會發生例外,特別是 SIGINT(中斷訊號)不會觸發KeyboardInterrupt。Raises
ShutDownif the queue has been shut down and is empty, or if the queue has been shut down immediately.
- Queue.get_nowait()¶
等效於
get(False)。
有兩個 method 可以支援追蹤放入佇列的任務是否已由常駐消費者執行緒 (daemon consumer threads) 完全處理。
- Queue.task_done()¶
表示先前放入佇列的任務已完成。由佇列消費者執行緒使用。對於用來提取任務的每個
get(),隨後呼叫task_done()告訴佇列任務的處理已完成。如果目前
join()阻塞,它將會在所有項目都已處理完畢後恢復(代表對於以put()放進佇列的每個項目,都要收到task_done()的呼叫)。shutdown(immediate=True)callstask_done()for each remaining item in the queue.如果呼叫次數超過佇列中放置的項目數量,則引發
ValueError。
- Queue.join()¶
持續阻塞直到佇列中的所有項目都已被取得並處理完畢。
每當項目被加到佇列中時,未完成任務的計數都會增加。每當消費者執行緒呼叫
task_done()以指示該項目已被取出並且對其的所有工作都已完成時,計數就會下降。當未完成任務的計數降至零時,join()將停止阻塞。
如何等待放入佇列的任務完成的範例:
import threading
import queue
q = queue.Queue()
def worker():
while True:
item = q.get()
print(f'Working on {item}')
print(f'Finished {item}')
q.task_done()
# Turn-on the worker thread.
threading.Thread(target=worker, daemon=True).start()
# Send thirty task requests to the worker.
for item in range(30):
q.put(item)
# Block until all tasks are done.
q.join()
print('All work completed')
Terminating queues¶
Queue objects can be made to prevent further interaction by shutting
them down.
- Queue.shutdown(immediate=False)¶
Shut down the queue, making
get()andput()raiseShutDown.By default,
get()on a shut down queue will only raise once the queue is empty. Set immediate to true to makeget()raise immediately instead.All blocked callers of
put()andget()will be unblocked. If immediate is true, a task will be marked as done for each remaining item in the queue, which may unblock callers ofjoin().在 3.13 版被加入.
SimpleQueue 物件¶
SimpleQueue 物件提供下面描述的公用 method。
- SimpleQueue.qsize()¶
傳回佇列的近似大小。注意,qsize() > 0 並不能保證後續的 get() 不會阻塞。
- SimpleQueue.empty()¶
如果佇列為空,則回傳
True,否則回傳False。如果 empty() 回傳False,則不保證後續呼叫 get() 不會阻塞。
- SimpleQueue.put(item, block=True, timeout=None)¶
將 item 放入佇列中。此 method 從不阻塞,並且都會成功(除了潛在的低階錯誤,像是分配記憶體失敗)。可選的 args block 和 timeout 會被忽略,它們僅是為了與
Queue.put()相容才存在。此 method 有一個可重入 (reentrant) 的 C 實作。意思就是,一個
put()或get()呼叫,可以被同一執行緒中的另一個put()呼叫中斷,而不會造成死鎖 (deadlock) 或損壞佇列中的內部狀態。這使得它適合在解構子 (destructor) 中使用,像是__del__method 或weakref回呼函式 (callback)。
- SimpleQueue.put_nowait(item)¶
等效於
put(item, block=False),用於與Queue.put_nowait()相容。
- SimpleQueue.get(block=True, timeout=None)¶
從佇列中移除並回傳一個項目。如果可選的 args block 為 true,且 timeout 為
None(預設值),則在必要時阻塞,直到有可用的項目。如果 timeout 是正數,則最多會阻塞 timeout 秒,如果該時間內沒有可用的項目,則會引發Empty例外。否則(block 為 false),如果立即可用,則回傳一個項目,否則引發Empty例外(在這種情況下,timeout 將被忽略)。
- SimpleQueue.get_nowait()¶
等效於
get(False)。
也參考
- Class
multiprocessing.Queue 用於多行程處理 (multi-processing)(而非多執行緒)情境 (context) 的佇列 class。
collections.deque 是無界佇列的替代實作,有快速且具原子性 (atomic) 的 append() 和 popleft() 操作,這些操作不需要鎖定,並且還支持索引。