Creates a new stream with each data event of this stream asynchronously mapped to a new event.
This acts like map, except that convert
may return a Future,
and in that case, this stream waits for that future to complete before
continuing with its result.
The returned stream is a broadcast stream if this stream is.
Stream<E> asyncMap<E>(FutureOr<E> convert(T event)) {
_StreamControllerBase<E> controller;
StreamSubscription<T> subscription;
void onListen() {
final add = controller.add;
assert(controller is _StreamController<E> ||
controller is _BroadcastStreamController);
final addError = controller._addError;
subscription = this.listen((T event) {
FutureOr<E> newValue;
try {
newValue = convert(event);
} catch (e, s) {
controller.addError(e, s);
return;
}
if (newValue is Future<E>) {
subscription.pause();
newValue
.then(add, onError: addError)
.whenComplete(subscription.resume);
} else {
controller.add(newValue);
}
}, onError: addError, onDone: controller.close);
}
if (this.isBroadcast) {
controller = new StreamController<E>.broadcast(
onListen: onListen,
onCancel: () {
subscription.cancel();
},
sync: true);
} else {
controller = new StreamController<E>(
onListen: onListen,
onPause: () {
subscription.pause();
},
onResume: () {
subscription.resume();
},
onCancel: () => subscription.cancel(),
sync: true);
}
return controller.stream;
}