Search code examples
flutterdartstreamrxdart

flutter missing stream events, but dart cli receives all events


I have a somewhat complex work processing stream where I don't get all of the events in flutter, but I do in cli.

I have a repro where I've simulated the work. Basically, work items are downloaded in batches. Each item takes around a second to process, but it's all async work that dart offloads to OS; therefore, I've used rxdart flatMap with maxConcurrency to process items concurrently. An event is dispatched for each item that is processed.

The cli gets all events, but the flutter gui does not. Here are the dartpads:
cli: https://dartpad.dev/?id=4b0c9e9383f07910677e2865d31be1bc
gui: https://dartpad.dev/?id=3981c77fb0814eb6e311fb083431cc4d

What am I doing wrong?

Note: I posted to flutter discord and a kind user said, "I'm not sure, but maybe StreamBuilder just doesn't react to every stream event. UI is made to be optimized, so two events per frame may just skip one, not sure"
It's an interesting thought that I figured I'd share in case it jogs someone's memory and can confirm. Of course either way, I'm hoping for a workaround/fix.

cli:

import 'dart:async';

import 'generator.dart';

Future<void> main() async {
  final generator = Generator();
  var stream = generator.start();
  var eventCount = 0;
  await for (final i in stream) {
    print('eventCount: ${++eventCount}');
    print('cli_main: $i');
  }
}

flutter:

import 'package:flutter/foundation.dart';
import 'package:flutter/material.dart';

import 'generator.dart';

// See the console for which events are missing and other output.

void main() {
  runApp(const MyApp());
}

class MyApp extends StatelessWidget {
  const MyApp({super.key});

  @override
  Widget build(BuildContext context) {
    return MaterialApp(
      title: 'Flutter Demo',
      theme: ThemeData(
        primarySwatch: Colors.blue,
        brightness: Brightness.dark,
      ),
      home: const GeneratorScreen(),
    );
  }
}

class GeneratorScreen extends StatefulWidget {
  const GeneratorScreen({Key? key}) : super(key: key);

  @override
  State<GeneratorScreen> createState() => _GeneratorScreenState();
}

class _GeneratorScreenState extends State<GeneratorScreen> {
  var _eventCount = 0;
  final _stream = Generator().start();
  final _eventSet = <int>{};

  Widget _showData(int event) {
    _eventCount++;
    if (!_eventSet.add(event)) {
      print('!!! $event already in set !!!!');
    }
    print('_showData: $event');
    print('_eventCount: $_eventCount');
    return Row(
      children: [
        Text('event data: $event, event count: $_eventCount'),
        Expanded(
          child: LinearProgressIndicator(
            value: clampDouble(_eventCount / 100.0, 0, 100),
          ),
        ),
      ],
    );
  }

  @override
  Widget build(BuildContext context) {
    return Scaffold(
      body: StreamBuilder(
        stream: _stream,
        builder: (BuildContext context, AsyncSnapshot<int?> snapshot) {
          if (snapshot.hasData) {
            final widget = _showData(snapshot.data!);

            if (snapshot.connectionState == ConnectionState.done) {
              final expected = [for (int i = 1; i <= 100; i++) i];
              final missing = expected.where((i) => !_eventSet.contains(i));
              print('_eventSet missing: $missing');
            }
            return widget;
          } else if (snapshot.hasError) {
            print('!!! ${snapshot.error} !!!');
            return Text(snapshot.error.toString());
          } else {
            return const CircularProgressIndicator();
          }
        },
      ),
    );
  }
}

common:

import 'dart:async';
import 'dart:math';

import 'package:rxdart/rxdart.dart';

class Generator {
  Stream<int> start() {
    final eventStreamController = StreamController<int>();
    unawaited(_doWork(eventStreamController.sink).whenComplete(
      () {
        eventStreamController.close();
      },
    ));
    return eventStreamController.stream;
  }

  Future<int> _doWork(StreamSink<int> eventSink) async {
    final workItemStream = _workItemStream();
    return await (workItemStream.flatMap(
      // change to 1 and all events make it to UI, but much slower
      // maxConcurrent: 1,
      maxConcurrent: 10,
      (workItem) async* {
        yield (await _doWorkItem(workItem, eventSink));
      },
    ).fold(0, (previous, element) => previous + element));
  }

  Future<int> _doWorkItem(int workItem, StreamSink<int> eventSink) async {
    final ms = Random().nextInt(1500) + 250;
    print('_doWorkItem: $workItem, ${ms}ms');
    await Future.delayed(Duration(milliseconds: ms));
    eventSink.add(workItem);
    return workItem;
  }

  Stream<int> _workItemStream() async* {
    var page = 0;
    const pages = 5;
    const perPage = 20;
    while (page < pages) {
      await Future.delayed(const Duration(milliseconds: 100));
      var pageItems =
          List<int>.generate(perPage, (i) => (page * perPage) + i + 1);
      page++;
      yield* Stream.fromIterable(pageItems);
    }
  }
}

Solution

  • As suggested, it turned out the answer essentially was that flutter couldn't keep up with the updates. I assumed that a StreamBuilder would update the ui at each emit, but I think it's better described as marking the ui as needing an update at each emit. If ui is marked as needing an update multiple times before an update actually happens, then just one event will be used for the update.

    The docs for StreamBuilder state:

    The builder is called at the discretion of the Flutter pipeline, and will thus receive a timing-dependent sub-sequence of the snapshots that represent the interaction with the stream.

    My problem was depending on each event for state, but not realizing I may not see all events when using StreamBuilder. I solved the repro by listening to the stream decoupled from ui updates (in initState), saving needed state to class variable, and creating a new stream to signal ui updates. Since class variables are being updated at every event, it no longer matters if the ui misses a few events - it will simply show the latest data at each update. (there may be better solutions)

    updated dartpad: https://dartpad.dev/?id=f0f3c3a27a6766d9f1011bee918eef13

    updated gui:

    class _GeneratorScreenState extends State<GeneratorScreen> {
      var _eventCount = 0;
      final _stream = Generator().start();
      final _uiStreamController = StreamController<int>();
      late final Stream<int> _uiStream = _uiStreamController.stream;
      final _eventSet = <int>{};
    
      @override
      void initState() {
        super.initState();
        unawaited(_uiListen());
      }
    
      Future<void> _uiListen() async {
        await for (final event in _stream) {
          _eventCount++;
          if (!_eventSet.add(event)) {
            print('!!! $event already in set !!!!');
          }
          print('_showData: $event');
          print('_eventCount: $_eventCount');
          _uiStreamController.add(event);
        }
        _uiStreamController.close();
      }
    
      Widget _showData(int event) {
        return Row(
          children: [
            Text('event data: $event, event count: $_eventCount'),
            Expanded(
              child: LinearProgressIndicator(
                value: clampDouble(_eventCount / 100.0, 0, 100),
              ),
            ),
          ],
        );
      }
    
      @override
      Widget build(BuildContext context) {
        return Scaffold(
          body: StreamBuilder(
            stream: _uiStream,
            builder: (BuildContext context, AsyncSnapshot<int?> snapshot) {
              if (snapshot.hasData) {
                final widget = _showData(snapshot.data!);
    
                if (snapshot.connectionState == ConnectionState.done) {
                  final expected = [for (int i = 1; i <= 100; i++) i];
                  final missing = expected.where((i) => !_eventSet.contains(i));
                  print('_eventSet missing: $missing');
                }
                return widget;
              } else if (snapshot.hasError) {
                print('!!! ${snapshot.error} !!!');
                return Text(snapshot.error.toString());
              } else {
                return const CircularProgressIndicator();
              }
            },
          ),
        );
      }
    }