Transforms each element into a sequence of asynchronous events.
Returns a new stream and for each event of this stream, do the following:
convert
function is called
with the element as argument to produce a convert-stream for the element.null
, no further action is taken for the elements.The returned stream is a broadcast stream if this stream is.
Stream<E> asyncExpand<E>(Stream<E> convert(T event)) {
_StreamControllerBase<E> controller;
StreamSubscription<T> subscription;
void onListen() {
assert(controller is _StreamController ||
controller is _BroadcastStreamController);
subscription = this.listen((T event) {
Stream<E> newStream;
try {
newStream = convert(event);
} catch (e, s) {
controller.addError(e, s);
return;
}
if (newStream != null) {
subscription.pause();
controller.addStream(newStream).whenComplete(subscription.resume);
}
},
onError: controller._addError, // Avoid Zone error replacement.
onDone: controller.close);
}
if (this.isBroadcast) {
controller = new StreamController<E>.broadcast(
onListen: onListen,
onCancel: () {
subscription.cancel();
},
sync: true);
} else {
controller = new StreamController<E>(
onListen: onListen,
onPause: () {
subscription.pause();
},
onResume: () {
subscription.resume();
},
onCancel: () => subscription.cancel(),
sync: true);
}
return controller.stream;
}