Search code examples
dartstreamdart-async

Applying a windowing function to a Dart stream


I am pretty new to Dart, and still wrapping my head around streams. Specifically I am having some difficulty with finding the proper way of making a function that takes a window of N elements from a stream, applies a function to it and restreams the results.

To clarify what I mean, I include an example that I implemented myself which led me to this question. The code takes a byte stream from a file and converts 4 byte chunks to an integer stream. By using an await for I was able to accomplish what I wanted but I am looking for a more idiomatic stream based function that accomplishes the same thing, more succinctly.

Stream<int> loadData(String path) async* {
  final f = File(path);
  final byteStream = f.openRead();
  var buffer = Uint8List(8);
  var i = 0;
  
  // This is where I would like to use a windowing function
  await for(var bs in byteStream) {
    for(var b in bs) {
      buffer[i++] = b;
      if(i == 8)  {
        var bytes = new ByteData.view(buffer.buffer);
        yield bytes.getUint16(0);
        i = 0;
      }
    }
  }
}

Solution

  • Look at bufferCount method from RxDart package.

    Buffers a number of values from the source Stream by count then emits the buffer and clears it, and starts a new buffer ...

    Here is an example:

    import 'dart:typed_data';
    
    import 'package:rxdart/rxdart.dart';
    
    main() {
      var bytes = Uint8List.fromList([255, 0, 0, 0, 0, 1, 0, 0, 1, 1, 0, 0, 2, 1, 0, 0]);
      Stream<int>.fromIterable(bytes)
          .bufferCount(4)
          .map((bytes) => Uint8List.fromList(bytes).buffer)
          .map((buffer) => ByteData.view(buffer).getInt32(0, Endian.little))
          .listen(print); // prints 255 256 257 258
    }
    

    It is worth noting that this particular task can be performed much easier:

        bytes.buffer.asInt32List();