協(xié)程的異步隊(duì)列。 這些類與標(biāo)準(zhǔn)庫的 ?asyncio
包中提供的類非常相似。
注意:
與標(biāo)準(zhǔn)庫的 ?queue
模塊不同,這里定義的類不是線程安全的。 要從另一個(gè)線程使用這些隊(duì)列,請(qǐng)?jiān)谡{(diào)用任何隊(duì)列方法之前使用 ?IOLoop.add_callback
? 將控制權(quán)轉(zhuǎn)移到 ?IOLoop
線程。
協(xié)調(diào)生產(chǎn)者和消費(fèi)者協(xié)程。
如果 ?maxsize
為 0(默認(rèn)值),則隊(duì)列大小是無限的。
from tornado import gen
from tornado.ioloop import IOLoop
from tornado.queues import Queue
q = Queue(maxsize=2)
async def consumer():
async for item in q:
try:
print('Doing work on %s' % item)
await gen.sleep(0.01)
finally:
q.task_done()
async def producer():
for item in range(5):
await q.put(item)
print('Put %s' % item)
async def main():
# Start consumer without waiting (since it never finishes).
IOLoop.current().spawn_callback(consumer)
await producer() # Wait for producer to put all tasks.
await q.join() # Wait for consumer to finish all tasks.
print('Done')
IOLoop.current().run_sync(main)
結(jié)果如下:
Put 0
Put 1
Doing work on 0
Put 2
Doing work on 1
Put 3
Doing work on 2
Put 4
Doing work on 3
Doing work on 4
Done
在沒有原生協(xié)程的 Python 版本中(3.5 之前),?consumer()
? 可以寫成:
@gen.coroutine
def consumer():
while True:
item = yield q.get()
try:
print('Doing work on %s' % item)
yield gen.sleep(0.01)
finally:
q.task_done()
隊(duì)列中允許的項(xiàng)目數(shù)。
隊(duì)列中的項(xiàng)目數(shù)。
將一個(gè)項(xiàng)目放入隊(duì)列中,也許等到有空間。
返回一個(gè) ?Future
?,它會(huì)在超時(shí)后引發(fā) ?tornado.util.TimeoutError
?。
?timeout
可以是一個(gè)表示時(shí)間的數(shù)字(與 ?tornado.ioloop.IOLoop.time
? 的比例相同,通常是 ?time.time
?),或者是相對(duì)于當(dāng)前時(shí)間的截止日期的 ?datetime.timedelta
? 對(duì)象。
將一個(gè)項(xiàng)目放入隊(duì)列而不阻塞。
如果沒有立即可用的空閑槽,則提高 ?QueueFull
?。
從隊(duì)列中移除并返回一個(gè)項(xiàng)目。
返回一個(gè)等待項(xiàng)目,一旦項(xiàng)目可用就解決,或在超時(shí)后引發(fā) ?tornado.util.TimeoutError
?。
?timeout
可以是一個(gè)表示時(shí)間的數(shù)字(與 ?tornado.ioloop.IOLoop.time
? 的比例相同,通常是?time.time
?),或者是相對(duì)于當(dāng)前時(shí)間的截止日期的 ?datetime.timedelta
? 對(duì)象。
注意:
該方法的 ?timeout
參數(shù)與標(biāo)準(zhǔn)庫的 ?queue.Queue.get
? 不同。 該方法將數(shù)值解釋為相對(duì)超時(shí); 這將它們解釋為絕對(duì)截止日期,并且需要 ?timedelta
對(duì)象用于相對(duì)超時(shí)(與 Tornado 中的其他超時(shí)一致)。
從隊(duì)列中移除并返回一個(gè)項(xiàng)目而不阻塞。
如果一個(gè)項(xiàng)目立即可用,則返回一個(gè)項(xiàng)目,否則引發(fā) ?QueueEmpty
?。
指示以前排隊(duì)的任務(wù)已完成。
由隊(duì)列消費(fèi)者使用。 對(duì)用于獲取任務(wù)的每個(gè) ?get
?,對(duì) ?task_done
的后續(xù)調(diào)用會(huì)告訴隊(duì)列該任務(wù)的處理已完成。
如果一個(gè)連接被阻塞,它會(huì)在所有項(xiàng)目都被處理后恢復(fù); 也就是說,當(dāng)每個(gè) ?put
?都與 ?task_done
?匹配時(shí)。
如果調(diào)用次數(shù)多于 ?put
?,則引發(fā) ?ValueError
?。
阻塞直到隊(duì)列中的所有項(xiàng)目都處理完畢。
返回一個(gè) ?awaitable
?,它在超時(shí)后引發(fā) ?tornado.util.TimeoutError
?。
按優(yōu)先級(jí)順序檢索條目的隊(duì)列,最低優(yōu)先。
條目通常是元組,如(?priority number
?, ?data
?)。
from tornado.queues import PriorityQueue
q = PriorityQueue()
q.put((1, 'medium-priority item'))
q.put((0, 'high-priority item'))
q.put((10, 'low-priority item'))
print(q.get_nowait())
print(q.get_nowait())
print(q.get_nowait())
結(jié)果如下:
(0, 'high-priority item')
(1, 'medium-priority item')
(10, 'low-priority item')
隊(duì)列首先檢索最近放置的項(xiàng)目。
from tornado.queues import LifoQueue
q = LifoQueue()
q.put(3)
q.put(2)
q.put(1)
print(q.get_nowait())
print(q.get_nowait())
print(q.get_nowait())
結(jié)果如下:
1
2
3
當(dāng)隊(duì)列沒有項(xiàng)目時(shí)由 ?Queue.get_nowait
? 引發(fā)。
當(dāng)隊(duì)列達(dá)到最大大小時(shí)由 ?Queue.put_nowait
? 引發(fā)。
更多建議: