tornado.queues
– Queues for coroutines¶
New in version 4.2.
Classes¶
Queue¶
-
class
tornado.queues.
Queue
(maxsize=0)[source]¶ Coordinate producer and consumer coroutines.
If maxsize is 0 (the default) the queue size is unbounded.
from tornado import gen from tornado.ioloop import IOLoop from tornado.queues import Queue q = Queue(maxsize=2) @gen.coroutine def consumer(): while True: item = yield q.get() try: print('Doing work on %s' % item) yield gen.sleep(0.01) finally: q.task_done() @gen.coroutine def producer(): for item in range(5): yield q.put(item) print('Put %s' % item) @gen.coroutine def main(): # Start consumer without waiting (since it never finishes). IOLoop.current().spawn_callback(consumer) yield producer() # Wait for producer to put all tasks. yield q.join() # Wait for consumer to finish all tasks. print('Done') IOLoop.current().run_sync(main)
Put 0 Put 1 Doing work on 0 Put 2 Doing work on 1 Put 3 Doing work on 2 Put 4 Doing work on 3 Doing work on 4 Done
In Python 3.5,
Queue
implements the async iterator protocol, soconsumer()
could be rewritten as:async def consumer(): async for item in q: try: print('Doing work on %s' % item) yield gen.sleep(0.01) finally: q.task_done()
Changed in version 4.3: Added
async for
support in Python 3.5.-
maxsize
¶ Number of items allowed in the queue.
-
put
(item, timeout=None)[source]¶ Put an item into the queue, perhaps waiting until there is room.
Returns a Future, which raises
tornado.gen.TimeoutError
after a timeout.
-
put_nowait
(item)[source]¶ Put an item into the queue without blocking.
If no free slot is immediately available, raise
QueueFull
.
-
get
(timeout=None)[source]¶ Remove and return an item from the queue.
Returns a Future which resolves once an item is available, or raises
tornado.gen.TimeoutError
after a timeout.
-
get_nowait
()[source]¶ Remove and return an item from the queue without blocking.
Return an item if one is immediately available, else raise
QueueEmpty
.
-
task_done
()[source]¶ Indicate that a formerly enqueued task is complete.
Used by queue consumers. For each
get
used to fetch a task, a subsequent call totask_done
tells the queue that the processing on the task is complete.If a
join
is blocking, it resumes when all items have been processed; that is, when everyput
is matched by atask_done
.Raises
ValueError
if called more times thanput
.
-
join
(timeout=None)[source]¶ Block until all items in the queue are processed.
Returns a Future, which raises
tornado.gen.TimeoutError
after a timeout.
-
PriorityQueue¶
-
class
tornado.queues.
PriorityQueue
(maxsize=0)[source]¶ A
Queue
that retrieves entries in priority order, lowest first.Entries are typically tuples like
(priority number, data)
.from tornado.queues import PriorityQueue q = PriorityQueue() q.put((1, 'medium-priority item')) q.put((0, 'high-priority item')) q.put((10, 'low-priority item')) print(q.get_nowait()) print(q.get_nowait()) print(q.get_nowait())
(0, 'high-priority item') (1, 'medium-priority item') (10, 'low-priority item')
Exceptions¶
QueueEmpty¶
-
exception
tornado.queues.
QueueEmpty
[source]¶ Raised by
Queue.get_nowait
when the queue has no items.
QueueFull¶
-
exception
tornado.queues.
QueueFull
[source]¶ Raised by
Queue.put_nowait
when a queue is at its maximum size.