I'm using the combinedLatest2
in RxDart but I'm still confused about it. This is my code:
final validator = StreamTransformer<String, String>.fromHandlers(
handleData: (data, sink) =>
data.isNotEmpty ? sink.add(data) : sink.addError('Cannot be empty.'));
final _subject1 = BehaviorSubject<String>();
final stream1 = _subject1.stream.transform(validator);
final changeSubject1 = _subject1.sink.add;
final _subject2 = BehaviorSubject<String>();
final stream2 = _subject2.stream.transform(validator);
final changeSubject2 = _subject2.sink.add;
final combined =
Observable.combineLatest2(stream1, stream2, (a, b) => '$a, $b');
I have two subjects of which I got references to their streams and sink.add
functions. Before I assign the streams to their respective variables, I added in a transformer that ensures a non-empty string was emitted, otherwise an error will be added to the sink. Finally, I created another stream, combined
, by combining the first two using Observable.combineLatest2
.
The combined
stream will emit only when its "children" streams have emitted valid values. The issue I'm facing here happens when the two streams have already emitted valid values, then both have emitted invalid values, and then one of them emitted a valid value. Interestingly, For the final emit, the combined
stream also emits the new value of the newly updated stream and the previous valid value of the other one (although an invalid value has already been emitted after the previous valid one). Can I prevent this from happening. In other words, running this code:
combined.listen((data) => print(data), onError: (error) => print('error'));
changeSubject1('Hello');
changeSubject2('World');
changeSubject1('');
changeSubject2('');
changeSubject1('NewWorld');
would generate this output:
Hello, World
error
error
NewHello, World
NewHello, NewWorld
The output that I'm trying to achieve:
Hello, World
error
error
NewHello, NewWorld
In summary, I'm trying to make the combined
stream emit only when the latest values of each stream is valid.
I was able to get what I desire by creating an "error-aware" combineList
variation function. The original combineList
function works by creating a new Observable
using a CombineLatestStream
stream. I created a new stream class, namely ErrorAwareCombineLatestStream
, that has almost the same implementation as CombineLatestStream
. I only added a couple of lines that preserve the error state of each stream, and only emit when all errors are resolved.
Here is my implementation:
The ErrorAwareCombineLatestStream
class:
class ErrorAwareCombineLatestStream<T, A, B, C, D, E, F, G, H, I>
extends Stream<T> {
final StreamController<T> controller;
ErrorAwareCombineLatestStream(Iterable<Stream<dynamic>> streams,
T combiner(A a, B b, [C c, D d, E e, F f, G g, H h, I i]))
: controller = _buildController(streams, combiner);
@override
StreamSubscription<T> listen(void Function(T event) onData,
{Function onError, void Function() onDone, bool cancelOnError}) {
return controller.stream.listen(onData,
onError: onError, onDone: onDone, cancelOnError: cancelOnError);
}
static StreamController<T> _buildController<T, A, B, C, D, E, F, G, H, I>(
Iterable<Stream<dynamic>> streams,
T combiner(A a, B b, [C c, D d, E e, F f, G g, H h, I i])) {
final List<StreamSubscription<dynamic>> subscriptions =
new List<StreamSubscription<dynamic>>(streams.length);
StreamController<T> controller;
controller = new StreamController<T>(
sync: true,
onListen: () {
final List<dynamic> values = new List<dynamic>(streams.length);
final List<bool> triggered =
new List<bool>.generate(streams.length, (_) => false);
final List<bool> completedStatus =
new List<bool>.generate(streams.length, (_) => false);
final List<bool> hasError =
new List<bool>.generate(streams.length, (_) => false);
for (int i = 0, len = streams.length; i < len; i++) {
Stream<dynamic> stream = streams.elementAt(i);
subscriptions[i] = stream.listen((dynamic value) {
values[i] = value;
triggered[i] = true;
hasError[i] = false;
final allStreamsHaveEvents =
triggered.reduce((bool a, bool b) => a && b) &&
!hasError.reduce((a, b) => a || b);
if (allStreamsHaveEvents)
updateWithValues(combiner, values, controller);
}, onError: (e) {
hasError[i] = true;
controller.addError(e);
}, onDone: () {
completedStatus[i] = true;
if (completedStatus.reduce((bool a, bool b) => a && b))
controller.close();
});
}
},
onCancel: () => Future.wait<dynamic>(subscriptions
.map((StreamSubscription<dynamic> subscription) =>
subscription.cancel())
.where((Future<dynamic> cancelFuture) => cancelFuture != null)));
return controller;
}
static void updateWithValues<T, A, B, C, D, E, F, G, H, I>(
T combiner(A a, B b, [C c, D d, E e, F f, G g, H h, I i]),
Iterable<dynamic> values,
StreamController<T> controller) {
try {
final int len = values.length;
final A a = values.elementAt(0);
final B b = values.elementAt(1);
T result;
switch (len) {
case 2:
result = combiner(a, b);
break;
case 3:
final C c = values.elementAt(2);
result = combiner(a, b, c);
break;
case 4:
final C c = values.elementAt(2);
final D d = values.elementAt(3);
result = combiner(a, b, c, d);
break;
case 5:
final C c = values.elementAt(2);
final D d = values.elementAt(3);
final E e = values.elementAt(4);
result = combiner(a, b, c, d, e);
break;
case 6:
final C c = values.elementAt(2);
final D d = values.elementAt(3);
final E e = values.elementAt(4);
final F f = values.elementAt(5);
result = combiner(a, b, c, d, e, f);
break;
case 7:
final C c = values.elementAt(2);
final D d = values.elementAt(3);
final E e = values.elementAt(4);
final F f = values.elementAt(5);
final G g = values.elementAt(6);
result = combiner(a, b, c, d, e, f, g);
break;
case 8:
final C c = values.elementAt(2);
final D d = values.elementAt(3);
final E e = values.elementAt(4);
final F f = values.elementAt(5);
final G g = values.elementAt(6);
final H h = values.elementAt(7);
result = combiner(a, b, c, d, e, f, g, h);
break;
case 9:
final C c = values.elementAt(2);
final D d = values.elementAt(3);
final E e = values.elementAt(4);
final F f = values.elementAt(5);
final G g = values.elementAt(6);
final H h = values.elementAt(7);
final I i = values.elementAt(8);
result = combiner(a, b, c, d, e, f, g, h, i);
break;
}
controller.add(result);
} catch (e, s) {
controller.addError(e, s);
}
}
}
The errorAwareCombineLatest2
function:
Observable<T> errorAwareCombineLatest2<A, B, T>(
Stream<A> streamOne, Stream<B> streamTwo, T combiner(A a, B b)) =>
new Observable<T>(new ErrorAwareCombineLatestStream<T, A, B, Null, Null,
Null, Null, Null, Null, Null>(
<Stream<dynamic>>[streamOne, streamTwo],
(A a, B b, [Null c, Null d, Null e, Null f, Null g, Null h, Null i]) =>
combiner(a, b)));