Search code examples
flutterdartstreamdart-stream

How to create a StreamController from a database Stream using StreamTransformer


I am trying to transform a stream from a database with the database data model to the domain data model.

I am quite confused bringing the different pieces of information together. While in the StreamTransformer examples that I have found, the stream is always a single object an not a list, my result from the example streaming ObjectBox data, returns Stream<List<PropertyObjectBox>>

And then it appears as if a piece of the puzzle is missing, how to come from Stream to StreamController.

So how what do I have to change at the following code?

/// <PropertyObjectBox> data model of Property in ObjectBox database
/// <PropertyModel> data model in data layer (not really needed, I know)
/// <Property> data model in domain layer


@override
// -->> should this return Stream<List<Property>?> or Stream<Property>?
// -->> or something else to comply with handleError?
Stream<List<Property>?> streamOnDeviceProperties() {
  Stream<List<PropertyObjectBox>> propObStream = objectbox.propertyBox.query()
      .watch(triggerImmediately: true).map((query) => query
  // Watching the query produces a Stream<Query<Property>>
  // To get the actual data inside a List<Property>, we need to call find() on the query
      .find());

//-->> again, PropertyObjectBox or List<PropertyObjectBox>?
  var streamTransformer = StreamTransformer<PropertyObjectBox, dynamic>.fromHandlers(
    handleData: (PropertyObjectBox data, EventSink sink) {
      final propertyModel = PropertyModel.fromObjectbox(data);
      final Property property = propertyModel.toDomain();
      sink.add(property);
    },
    handleError: (Object error, StackTrace stacktrace, EventSink sink) {
      sink.addError('Something went wrong: $error');
    },
    handleDone: (EventSink sink) => sink.close(),
  );
//-->> next line causes error 'the getter 'stream' isn't defined for the type 'Stream<List<PropertyObjectBox>>'  
  var controllerStream = propObStream.stream.transform(streamTransformer);

Solution

  • This should do the trick: Updated

    @override
    Stream<List<Property>?> streamOnDeviceProperties() {
    
      Stream<List<PropertyObjectBox>?> propObStream = objectbox.propertyBox.query()
          .watch(triggerImmediately: true).map((query) => query.find());
    
    // List<PropertyObjectBox>?
      StreamTransformer<List<PropertyObjectBox>?,List<Property>?> streamTransformer = StreamTransformer<List<PropertyObjectBox>?,List<Property>?>.fromHandlers(
        handleData: (List<PropertyObjectBox>? data, EventSink sink) {
          var newList = data!.map((value) {
            final propertyModel = PropertyModel.fromObjectbox(value);
            final Property property = propertyModel.toDomain();
            return property;
          }).toList();
          sink.add(newList);
        },
        handleError: (Object error, StackTrace stacktrace, EventSink sink) {
          sink.addError('Something went wrong: $error');
        },
        handleDone: (EventSink sink) => sink.close(),
      );
      
     // if you need a Controller although I don't know why
      Stream<List<Property>?> newStream = propObStream.transform(streamTransformer);
      
      final StreamController<List<Property>?> streamController = StreamController<List<Property>?>(
        onPause: () => print('Paused'),
        onResume: () => print('Resumed'),
        onCancel: () => print('Cancelled'),
        onListen: () => print('Listens'),
      );
      streamController.addStream(newStream);
    //********************************
      
      return streamTransformer.bind(propObStream);
    }
    

    Access the stream like this:

    Stream<List<Property>?> _mystream = streamOnDeviceProperties();