I'd like to validate that multiple Streams emit elements in a specific order.
Let's take the following 2 Streams.
Stream<String> data;
Stream<SomeStatus> status;
with emitsInOrder
one might do the following:
expect(data, emitsInOrder(["a", "b", "c"]));
expect(status, emitsInOrder([SomeStatus.loading, SomeStatus.connected, SomeStatus.disconnected]));
But how can I easily test for something like this:
Map map = {"data": data, "status": status};
expect(map, emitsInOrder([
{"status": SomeStatus.loading},
{"status": SomeStatus.connected},
{"data": "a"},
... // you get the idea
]);
Thanks
Edit: I've added my attempt to solve this as an answer and would appreciate any feedback.
Here's what I came up with:
import 'dart:async';
import 'package:rxdart/rxdart.dart';
import 'package:test/test.dart';
class StreamKey<T> {
final String key;
final Stream<T> stream;
const StreamKey(this.key, this.stream);
StreamEventType emitted(T t) => StreamEmitted(key, t);
StreamEventType done() => StreamDone(key);
StreamEventType errored(dynamic error) => StreamErrored(key, error);
}
void expectStreamsInOrder(List<StreamKey> keys, dynamic matcher) {
Map<String, StreamSubscription> subs = {};
expect(keys.map((a) => a.key).toSet().length, keys.length, reason: "Keys need to be unique");
PublishSubject<StreamEventType> s = PublishSubject();
expect(s, matcher);
void checkEmpty() {
assert(!s.isClosed);
if(subs.values.isEmpty) {
s.close();
}
}
keys.forEach((key) {
subs[key.key] = key.stream.listen(
(data) {
s.add(key.emitted(data));
},
onError: (error) {
s.add(key.errored(error));
subs.remove(key.key).cancel();
checkEmpty();
},
onDone: () {
s.add(key.done());
subs.remove(key.key).cancel();
checkEmpty();
},
);
});
}
abstract class StreamEventType {
final String key;
const StreamEventType(this.key);
}
class StreamEmitted<T> extends StreamEventType {
final T data;
StreamEmitted(String key, this.data) : super(key);
@override
bool operator ==(Object other) =>
identical(this, other) ||
other is StreamEmitted &&
runtimeType == other.runtimeType &&
key == other.key &&
data == other.data;
@override
int get hashCode => data.hashCode ^ key.hashCode;
@override
String toString() => 'StreamEmitted{key: $key, data: $data}';
}
class StreamDone extends StreamEventType {
StreamDone(String key) : super(key);
@override
bool operator ==(Object other) =>
identical(this, other) ||
other is StreamDone &&
key == other.key &&
runtimeType == other.runtimeType;
@override
int get hashCode => key.hashCode;
@override
String toString() => 'StreamDone{key: $key}';
}
class StreamErrored extends StreamEventType {
final dynamic error;
StreamErrored(String key, this.error) : super(key);
@override
bool operator ==(Object other) =>
identical(this, other) ||
other is StreamErrored &&
runtimeType == other.runtimeType &&
key == other.key &&
error == other.error;
@override
int get hashCode => error.hashCode ^ key.hashCode;
@override
String toString() => 'StreamErrored{key: $key, error: $error}';
}
and
import 'package:rxdart/rxdart.dart';
import 'package:test/test.dart';
import 'streams_in_order.dart';
void main() {
group("$expectStreamsInOrder", () {
Subject<String> data; // ignore: close_sinks
Subject<SomeStatus> status; // ignore: close_sinks
StreamKey<String> keyData;
StreamKey<SomeStatus> keyStatus;
setUp(() {
data = PublishSubject(sync: true);
status = PublishSubject(sync: true);
keyData = StreamKey("data", data);
keyStatus = StreamKey("status", status);
});
test("example", () async {
expectStreamsInOrder([keyData, keyStatus], emitsInOrder([
keyStatus.emitted(SomeStatus.loading),
keyStatus.emitted(SomeStatus.connected),
keyData.emitted("a"),
keyStatus.emitted(SomeStatus.disconnected),
keyData.done(),
keyStatus.done(),
emitsDone,
]));
status.add(SomeStatus.loading);
status.add(SomeStatus.connected);
data.add("a");
status.add(SomeStatus.disconnected);
await data.close();
await status.close();
});
test("example with error", () async {
expectStreamsInOrder([keyData, keyStatus], emitsInOrder([
keyStatus.emitted(SomeStatus.loading),
keyStatus.emitted(SomeStatus.connected),
keyData.emitted("a"),
keyStatus.emitted(SomeStatus.disconnected),
keyData.errored("err1"),
keyStatus.errored("err2"),
emitsDone,
]));
status.add(SomeStatus.loading);
status.add(SomeStatus.connected);
data.add("a");
status.add(SomeStatus.disconnected);
data.addError("err1");
status.addError("err2");
});
test("-", () async {
expectStreamsInOrder([keyData, keyStatus], emitsInOrder([
keyData.emitted("aa"),
keyStatus.emitted(SomeStatus.loading),
keyData.errored(123),
keyStatus.done(),
emitsDone,
]));
data.add("aa");
status.add(SomeStatus.loading);
data.addError(123);
await status.close();
});
test("-", () async {
expectStreamsInOrder([keyData, keyStatus], emitsInOrder([
keyData.emitted("aa"),
keyStatus.emitted(SomeStatus.loading),
keyStatus.done(),
keyData.errored(123),
emitsDone,
]));
data.add("aa");
status.add(SomeStatus.loading);
await status.close();
data.addError(123);
});
});
}
enum SomeStatus {
loading,
connected,
disconnected,
}