Search code examples
dartrxjsreactivexrxdart

RX: Handling Errors in combineList


I'm using the combinedLatest2 in RxDart but I'm still confused about it. This is my code:

final validator = StreamTransformer<String, String>.fromHandlers(
    handleData: (data, sink) =>
        data.isNotEmpty ? sink.add(data) : sink.addError('Cannot be empty.'));

final _subject1 = BehaviorSubject<String>();
final stream1 = _subject1.stream.transform(validator);
final changeSubject1 = _subject1.sink.add;

final _subject2 = BehaviorSubject<String>();
final stream2 = _subject2.stream.transform(validator);
final changeSubject2 = _subject2.sink.add;

final combined =
    Observable.combineLatest2(stream1, stream2, (a, b) => '$a, $b');

I have two subjects of which I got references to their streams and sink.add functions. Before I assign the streams to their respective variables, I added in a transformer that ensures a non-empty string was emitted, otherwise an error will be added to the sink. Finally, I created another stream, combined, by combining the first two using Observable.combineLatest2.

The combined stream will emit only when its "children" streams have emitted valid values. The issue I'm facing here happens when the two streams have already emitted valid values, then both have emitted invalid values, and then one of them emitted a valid value. Interestingly, For the final emit, the combined stream also emits the new value of the newly updated stream and the previous valid value of the other one (although an invalid value has already been emitted after the previous valid one). Can I prevent this from happening. In other words, running this code:

combined.listen((data) => print(data), onError: (error) => print('error'));

changeSubject1('Hello');
changeSubject2('World');
changeSubject1('');
changeSubject2('');
changeSubject1('NewWorld');

would generate this output:

Hello, World
error
error
NewHello, World
NewHello, NewWorld

The output that I'm trying to achieve:

Hello, World
error
error
NewHello, NewWorld

In summary, I'm trying to make the combined stream emit only when the latest values of each stream is valid.


Solution

  • I was able to get what I desire by creating an "error-aware" combineList variation function. The original combineList function works by creating a new Observable using a CombineLatestStream stream. I created a new stream class, namely ErrorAwareCombineLatestStream, that has almost the same implementation as CombineLatestStream. I only added a couple of lines that preserve the error state of each stream, and only emit when all errors are resolved.

    Here is my implementation:

    The ErrorAwareCombineLatestStream class:

    class ErrorAwareCombineLatestStream<T, A, B, C, D, E, F, G, H, I>
        extends Stream<T> {
      final StreamController<T> controller;
    
      ErrorAwareCombineLatestStream(Iterable<Stream<dynamic>> streams,
          T combiner(A a, B b, [C c, D d, E e, F f, G g, H h, I i]))
          : controller = _buildController(streams, combiner);
    
      @override
      StreamSubscription<T> listen(void Function(T event) onData,
          {Function onError, void Function() onDone, bool cancelOnError}) {
        return controller.stream.listen(onData,
            onError: onError, onDone: onDone, cancelOnError: cancelOnError);
      }
    
      static StreamController<T> _buildController<T, A, B, C, D, E, F, G, H, I>(
          Iterable<Stream<dynamic>> streams,
          T combiner(A a, B b, [C c, D d, E e, F f, G g, H h, I i])) {
        final List<StreamSubscription<dynamic>> subscriptions =
            new List<StreamSubscription<dynamic>>(streams.length);
        StreamController<T> controller;
    
        controller = new StreamController<T>(
            sync: true,
            onListen: () {
              final List<dynamic> values = new List<dynamic>(streams.length);
              final List<bool> triggered =
                  new List<bool>.generate(streams.length, (_) => false);
              final List<bool> completedStatus =
                  new List<bool>.generate(streams.length, (_) => false);
              final List<bool> hasError =
                  new List<bool>.generate(streams.length, (_) => false);
    
              for (int i = 0, len = streams.length; i < len; i++) {
                Stream<dynamic> stream = streams.elementAt(i);
                subscriptions[i] = stream.listen((dynamic value) {
                  values[i] = value;
                  triggered[i] = true;
                  hasError[i] = false;
    
                  final allStreamsHaveEvents =
                      triggered.reduce((bool a, bool b) => a && b) &&
                          !hasError.reduce((a, b) => a || b);
    
                  if (allStreamsHaveEvents)
                    updateWithValues(combiner, values, controller);
                }, onError: (e) {
                  hasError[i] = true;
                  controller.addError(e);
                }, onDone: () {
                  completedStatus[i] = true;
    
                  if (completedStatus.reduce((bool a, bool b) => a && b))
                    controller.close();
                });
              }
            },
            onCancel: () => Future.wait<dynamic>(subscriptions
                .map((StreamSubscription<dynamic> subscription) =>
                    subscription.cancel())
                .where((Future<dynamic> cancelFuture) => cancelFuture != null)));
    
        return controller;
      }
    
      static void updateWithValues<T, A, B, C, D, E, F, G, H, I>(
          T combiner(A a, B b, [C c, D d, E e, F f, G g, H h, I i]),
          Iterable<dynamic> values,
          StreamController<T> controller) {
        try {
          final int len = values.length;
          final A a = values.elementAt(0);
          final B b = values.elementAt(1);
          T result;
    
          switch (len) {
            case 2:
              result = combiner(a, b);
              break;
            case 3:
              final C c = values.elementAt(2);
    
              result = combiner(a, b, c);
              break;
            case 4:
              final C c = values.elementAt(2);
              final D d = values.elementAt(3);
    
              result = combiner(a, b, c, d);
              break;
            case 5:
              final C c = values.elementAt(2);
              final D d = values.elementAt(3);
              final E e = values.elementAt(4);
    
              result = combiner(a, b, c, d, e);
              break;
            case 6:
              final C c = values.elementAt(2);
              final D d = values.elementAt(3);
              final E e = values.elementAt(4);
              final F f = values.elementAt(5);
    
              result = combiner(a, b, c, d, e, f);
              break;
            case 7:
              final C c = values.elementAt(2);
              final D d = values.elementAt(3);
              final E e = values.elementAt(4);
              final F f = values.elementAt(5);
              final G g = values.elementAt(6);
    
              result = combiner(a, b, c, d, e, f, g);
              break;
            case 8:
              final C c = values.elementAt(2);
              final D d = values.elementAt(3);
              final E e = values.elementAt(4);
              final F f = values.elementAt(5);
              final G g = values.elementAt(6);
              final H h = values.elementAt(7);
    
              result = combiner(a, b, c, d, e, f, g, h);
              break;
            case 9:
              final C c = values.elementAt(2);
              final D d = values.elementAt(3);
              final E e = values.elementAt(4);
              final F f = values.elementAt(5);
              final G g = values.elementAt(6);
              final H h = values.elementAt(7);
              final I i = values.elementAt(8);
    
              result = combiner(a, b, c, d, e, f, g, h, i);
              break;
          }
    
          controller.add(result);
        } catch (e, s) {
          controller.addError(e, s);
        }
      }
    }
    

    The errorAwareCombineLatest2 function:

    Observable<T> errorAwareCombineLatest2<A, B, T>(
            Stream<A> streamOne, Stream<B> streamTwo, T combiner(A a, B b)) =>
        new Observable<T>(new ErrorAwareCombineLatestStream<T, A, B, Null, Null,
                Null, Null, Null, Null, Null>(
            <Stream<dynamic>>[streamOne, streamTwo],
            (A a, B b, [Null c, Null d, Null e, Null f, Null g, Null h, Null i]) =>
                combiner(a, b)));