Search code examples
dartflutterflutter-testrxdart

How to test whether multiple Streams emit in a specific order?


I'd like to validate that multiple Streams emit elements in a specific order.

Let's take the following 2 Streams.

Stream<String> data;
Stream<SomeStatus> status;

with emitsInOrder one might do the following:

expect(data, emitsInOrder(["a", "b", "c"]));
expect(status, emitsInOrder([SomeStatus.loading, SomeStatus.connected, SomeStatus.disconnected]));

But how can I easily test for something like this:

Map map = {"data": data, "status": status};
expect(map, emitsInOrder([
   {"status": SomeStatus.loading},
   {"status": SomeStatus.connected},
   {"data": "a"},
   ...  // you get the idea
]);

Thanks

Edit: I've added my attempt to solve this as an answer and would appreciate any feedback.


Solution

  • Here's what I came up with:

    import 'dart:async';
    import 'package:rxdart/rxdart.dart';
    import 'package:test/test.dart';
    
    class StreamKey<T> {
    
        final String key;
    
        final Stream<T> stream;
    
        const StreamKey(this.key, this.stream);
    
        StreamEventType emitted(T t) => StreamEmitted(key, t);
    
        StreamEventType done() => StreamDone(key);
    
        StreamEventType errored(dynamic error) => StreamErrored(key, error);
    }
    
    void expectStreamsInOrder(List<StreamKey> keys, dynamic matcher) {
        Map<String, StreamSubscription> subs = {};
    
        expect(keys.map((a) => a.key).toSet().length, keys.length, reason: "Keys need to be unique");
    
        PublishSubject<StreamEventType> s = PublishSubject();
    
        expect(s, matcher);
    
        void checkEmpty() {
            assert(!s.isClosed);
            if(subs.values.isEmpty) {
                s.close();
            }
        }
    
        keys.forEach((key) {
            subs[key.key] = key.stream.listen(
                    (data) {
                    s.add(key.emitted(data));
                },
                onError: (error) {
                    s.add(key.errored(error));
                    subs.remove(key.key).cancel();
                    checkEmpty();
                },
                onDone: () {
                    s.add(key.done());
                    subs.remove(key.key).cancel();
                    checkEmpty();
                },
            );
        });
    }
    
    abstract class StreamEventType {
        final String key;
    
        const StreamEventType(this.key);
    }
    
    class StreamEmitted<T> extends StreamEventType {
    
        final T data;
    
        StreamEmitted(String key, this.data) : super(key);
    
        @override
        bool operator ==(Object other) =>
            identical(this, other) ||
                other is StreamEmitted &&
                    runtimeType == other.runtimeType &&
                    key == other.key &&
                    data == other.data;
        @override
        int get hashCode => data.hashCode ^ key.hashCode;
    
        @override
        String toString() => 'StreamEmitted{key: $key, data: $data}';
    }
    
    class StreamDone extends StreamEventType {
    
        StreamDone(String key) : super(key);
    
        @override
        bool operator ==(Object other) =>
            identical(this, other) ||
                other is StreamDone &&
                    key == other.key &&
                    runtimeType == other.runtimeType;
    
        @override
        int get hashCode => key.hashCode;
    
        @override
        String toString() => 'StreamDone{key: $key}';
    }
    
    class StreamErrored extends StreamEventType {
    
        final dynamic error;
    
        StreamErrored(String key, this.error) : super(key);
    
        @override
        bool operator ==(Object other) =>
            identical(this, other) ||
                other is StreamErrored &&
                    runtimeType == other.runtimeType &&
                    key == other.key &&
                    error == other.error;
    
        @override
        int get hashCode => error.hashCode ^ key.hashCode;
    
        @override
        String toString() => 'StreamErrored{key: $key, error: $error}';
    }
    

    and

    import 'package:rxdart/rxdart.dart';
    import 'package:test/test.dart';
    
    import 'streams_in_order.dart';
    
    void main() {
        group("$expectStreamsInOrder", () {
            Subject<String> data; // ignore: close_sinks
            Subject<SomeStatus> status; // ignore: close_sinks
            StreamKey<String> keyData;
            StreamKey<SomeStatus> keyStatus;
    
            setUp(() {
                data = PublishSubject(sync: true);
                status = PublishSubject(sync: true);
                keyData = StreamKey("data", data);
                keyStatus = StreamKey("status", status);
            });
    
            test("example", () async {
                expectStreamsInOrder([keyData, keyStatus], emitsInOrder([
                    keyStatus.emitted(SomeStatus.loading),
                    keyStatus.emitted(SomeStatus.connected),
                    keyData.emitted("a"),
                    keyStatus.emitted(SomeStatus.disconnected),
                    keyData.done(),
                    keyStatus.done(),
                    emitsDone,
                ]));
    
                status.add(SomeStatus.loading);
                status.add(SomeStatus.connected);
                data.add("a");
                status.add(SomeStatus.disconnected);
                await data.close();
                await status.close();
            });
    
    
            test("example with error", () async {
                expectStreamsInOrder([keyData, keyStatus], emitsInOrder([
                    keyStatus.emitted(SomeStatus.loading),
                    keyStatus.emitted(SomeStatus.connected),
                    keyData.emitted("a"),
                    keyStatus.emitted(SomeStatus.disconnected),
                    keyData.errored("err1"),
                    keyStatus.errored("err2"),
                    emitsDone,
                ]));
    
                status.add(SomeStatus.loading);
                status.add(SomeStatus.connected);
                data.add("a");
                status.add(SomeStatus.disconnected);
                data.addError("err1");
                status.addError("err2");
            });
    
            test("-", () async {
                expectStreamsInOrder([keyData, keyStatus], emitsInOrder([
                    keyData.emitted("aa"),
                    keyStatus.emitted(SomeStatus.loading),
                    keyData.errored(123),
                    keyStatus.done(),
                    emitsDone,
                ]));
    
                data.add("aa");
                status.add(SomeStatus.loading);
                data.addError(123);
                await status.close();
            });
    
            test("-", () async {
                expectStreamsInOrder([keyData, keyStatus], emitsInOrder([
                    keyData.emitted("aa"),
                    keyStatus.emitted(SomeStatus.loading),
                    keyStatus.done(),
                    keyData.errored(123),
                    emitsDone,
                ]));
    
                data.add("aa");
                status.add(SomeStatus.loading);
                await status.close();
                data.addError(123);
            });
        });
    }
    
    enum SomeStatus {
        loading,
        connected,
        disconnected,
    }