Search code examples
dartstreamdart-async

Dart : How to create a stream that aggregates events from another stream?


What is the best way to create a stream that needs to aggregate multiple events from another Stream ?

My goal is to create a stream that aggregates events from another stream until it has enough events to build a message. In my case I am reading data from the Socket stream so a message may be distributed across different events and an event may contain data for various messages, hence I can't just apply a map operation over each element.

It seems the correct would way would be to use a Stream Transformer yet I am having trouble finding information on how to implement it correctly and without too much boilerplate code.

I came up with a solution after reading about how to create streams but I am not sure if this is acceptable nor the best way to do it.

Here's my solution example:

Stream<String> joinWordsIfStartWithC(Stream<String> a) async* {
  var prevWord= '';
  await for (var i in a) {
    prevWord += i;
    if(i.startsWith('C')){
      yield prevWord;
      prevWord = '';
    }
  }
}

Stream<String> periodicStream(Duration interval) async* {
  while (true) {
    await Future.delayed(interval);
    yield 'C';
    yield 'A';
    yield 'B';
    yield 'C';
    yield 'C';
    yield 'B';
    yield 'C';
  }
}

void main(List<String> arguments) async {
  var intStream = periodicStream(Duration(seconds: 2));

  var sStream = joinWordsIfStartWithC(intStream);

  sStream.listen((s) => print(s));
}

Solution

  • I will say your solution seems to be fine but if you want to make a stream transformer it is rather easy by extending from StreamTransformerBase:

    import 'dart:async';
    
    class JoinWordsIfStartWithCTransformer extends StreamTransformerBase<String, String> {
      Stream<String> bind(Stream<String> a) async* {
        var prevWord = '';
        await for (var i in a) {
          prevWord += i;
          if (i.startsWith('C')) {
            yield prevWord;
            prevWord = '';
          }
        }
      }
    }
    
    Stream<String> periodicStream(Duration interval) async* {
      while (true) {
        await Future.delayed(interval);
        yield 'C';
        yield 'A';
        yield 'B';
        yield 'C';
        yield 'C';
        yield 'B';
        yield 'C';
      }
    }
    
    void main(List<String> arguments) async {
      var intStream = periodicStream(Duration(seconds: 2));
    
      var sStream = intStream.transform(JoinWordsIfStartWithCTransformer());
    
      sStream.listen((s) => print(s));
    }