What is the best way to create a stream that needs to aggregate multiple events from another Stream ?
My goal is to create a stream that aggregates events from another stream until it has enough events to build a message. In my case I am reading data from the Socket stream so a message may be distributed across different events and an event may contain data for various messages, hence I can't just apply a map operation over each element.
It seems the correct would way would be to use a Stream Transformer yet I am having trouble finding information on how to implement it correctly and without too much boilerplate code.
I came up with a solution after reading about how to create streams but I am not sure if this is acceptable nor the best way to do it.
Here's my solution example:
Stream<String> joinWordsIfStartWithC(Stream<String> a) async* {
var prevWord= '';
await for (var i in a) {
prevWord += i;
if(i.startsWith('C')){
yield prevWord;
prevWord = '';
}
}
}
Stream<String> periodicStream(Duration interval) async* {
while (true) {
await Future.delayed(interval);
yield 'C';
yield 'A';
yield 'B';
yield 'C';
yield 'C';
yield 'B';
yield 'C';
}
}
void main(List<String> arguments) async {
var intStream = periodicStream(Duration(seconds: 2));
var sStream = joinWordsIfStartWithC(intStream);
sStream.listen((s) => print(s));
}
I will say your solution seems to be fine but if you want to make a stream transformer it is rather easy by extending from StreamTransformerBase
:
import 'dart:async';
class JoinWordsIfStartWithCTransformer extends StreamTransformerBase<String, String> {
Stream<String> bind(Stream<String> a) async* {
var prevWord = '';
await for (var i in a) {
prevWord += i;
if (i.startsWith('C')) {
yield prevWord;
prevWord = '';
}
}
}
}
Stream<String> periodicStream(Duration interval) async* {
while (true) {
await Future.delayed(interval);
yield 'C';
yield 'A';
yield 'B';
yield 'C';
yield 'C';
yield 'B';
yield 'C';
}
}
void main(List<String> arguments) async {
var intStream = periodicStream(Duration(seconds: 2));
var sStream = intStream.transform(JoinWordsIfStartWithCTransformer());
sStream.listen((s) => print(s));
}