StreamQueue<T> class

An asynchronous pull-based interface for accessing stream events.

Wraps a stream and makes individual events available on request.

You can request (and reserve) one or more events from the stream, and after all previous requests have been fulfilled, stream events go towards fulfilling your request.

For example, if you ask for next two times, the returned futures will be completed by the next two unrequested events from the stream.

The stream subscription is paused when there are no active requests.

Some streams, including broadcast streams, will buffer events while paused, so waiting too long between requests may cause memory bloat somewhere else.

This is similar to, but more convenient than, a StreamIterator. A StreamIterator requires you to manually check when a new event is available and you can only access the value of that event until you check for the next one. A StreamQueue allows you to request, for example, three events at a time, either individually, as a group using take or skip, or in any combination.

You can also ask to have the rest of the stream provided as a new stream. This allows, for example, taking the first event out of a stream and continuing to use the rest of the stream as a stream.

Example:

var events = new StreamQueue<String>(someStreamOfLines);
var first = await events.next;
while (first.startsWith('#')) {
  // Skip comments.
  first = await events.next;
}

if (first.startsWith(MAGIC_MARKER)) {
  var headerCount =
      first.parseInt(first.substring(MAGIC_MARKER.length + 1));
  handleMessage(headers: await events.take(headerCount),
                body: events.rest);
  return;
}
// Error handling.

When you need no further events the StreamQueue should be closed using cancel. This releases the underlying stream subscription.

Constructors

StreamQueue(Stream<T> source)
Create a StreamQueue of the events of source.
factory

Properties

eventsDispatched int
The number of events dispatched by this queue. [...]
read-only
hasNext Future<bool>
Asks if the stream has any more events. [...]
read-only
next Future<T>
Requests the next (yet unrequested) event from the stream. [...]
read-only
peek Future<T>
Looks at the next (yet unrequested) event from the stream. [...]
read-only
rest Stream<T>
Returns a stream of all the remaning events of the source stream. [...]
read-only
hashCode int
The hash code for this object. [...]
read-only, inherited
runtimeType Type
A representation of the runtime type of the object.
read-only, inherited

Methods

cancel({bool immediate: false }) Future
Cancels the underlying event source. [...]
cancelable<S>(Future<S> callback(StreamQueue<T> queue)) CancelableOperation<S>
Passes a copy of this queue to callback, and updates this queue to match the copy's position once callback completes. [...]
lookAhead(int count) Future<List<T>>
Look at the next count data events without consuming them. [...]
skip(int count) Future<int>
Skips the next count data events. [...]
startTransaction() StreamQueueTransaction<T>
Requests a transaction that can conditionally consume events. [...]
take(int count) Future<List<T>>
Requests the next count data events as a list. [...]
withTransaction(Future<bool> callback(StreamQueue<T> queue)) Future<bool>
Passes a copy of this queue to callback, and updates this queue to match the copy's position if callback returns true. [...]
noSuchMethod(Invocation invocation) → dynamic
Invoked when a non-existent method or property is accessed. [...]
inherited
toString() String
Returns a string representation of this object.
inherited

Operators

operator ==(dynamic other) bool
The equality operator. [...]
inherited