Creates a new stream with the same events as this stream.
Whenever more than timeLimit
passes between two events from this stream,
the onTimeout
function is called, which can emit further events on
the returned stream.
The countdown doesn't start until the returned stream is listened to. The countdown is reset every time an event is forwarded from this stream, or when this stream is paused and resumed.
The onTimeout
function is called with one argument: an
EventSink that allows putting events into the returned stream.
This EventSink
is only valid during the call to onTimeout
.
Calling EventSink.close on the sink passed to onTimeout
closes the
returned stream, and no further events are processed.
If onTimeout
is omitted, a timeout will just put a TimeoutException
into the error channel of the returned stream.
If the call to onTimeout
throws, the error is emitted on the returned
stream.
The returned stream is a broadcast stream if this stream is. If a broadcast stream is listened to more than once, each subscription will have its individually timer that starts counting on listen, and the subscriptions' timers can be paused individually.
Stream<T> timeout(Duration timeLimit, {void onTimeout(EventSink<T> sink)}) {
_StreamControllerBase<T> controller;
// The following variables are set on listen.
StreamSubscription<T> subscription;
Timer timer;
Zone zone;
_TimerCallback timeout;
void onData(T event) {
timer.cancel();
controller.add(event);
timer = zone.createTimer(timeLimit, timeout);
}
void onError(error, StackTrace stackTrace) {
timer.cancel();
assert(controller is _StreamController ||
controller is _BroadcastStreamController);
controller._addError(error, stackTrace); // Avoid Zone error replacement.
timer = zone.createTimer(timeLimit, timeout);
}
void onDone() {
timer.cancel();
controller.close();
}
void onListen() {
// This is the onListen callback for of controller.
// It runs in the same zone that the subscription was created in.
// Use that zone for creating timers and running the onTimeout
// callback.
zone = Zone.current;
if (onTimeout == null) {
timeout = () {
controller.addError(
new TimeoutException("No stream event", timeLimit), null);
};
} else {
// TODO(floitsch): the return type should be 'void', and the type
// should be inferred.
var registeredOnTimeout =
zone.registerUnaryCallback<dynamic, EventSink<T>>(onTimeout);
var wrapper = new _ControllerEventSinkWrapper<T>(null);
timeout = () {
wrapper._sink = controller; // Only valid during call.
zone.runUnaryGuarded(registeredOnTimeout, wrapper);
wrapper._sink = null;
};
}
subscription = this.listen(onData, onError: onError, onDone: onDone);
timer = zone.createTimer(timeLimit, timeout);
}
Future onCancel() {
timer.cancel();
Future result = subscription.cancel();
subscription = null;
return result;
}
controller = isBroadcast
? new _SyncBroadcastStreamController<T>(onListen, onCancel)
: new _SyncStreamController<T>(onListen, () {
// Don't null the timer, onCancel may call cancel again.
timer.cancel();
subscription.pause();
}, () {
subscription.resume();
timer = zone.createTimer(timeLimit, timeout);
}, onCancel);
return controller.stream;
}