I have a somewhat complex work processing stream where I don't get all of the events in flutter, but I do in cli.
I have a repro where I've simulated the work. Basically, work items are downloaded in batches. Each item takes around a second to process, but it's all async work that dart offloads to OS; therefore, I've used rxdart flatMap
with maxConcurrency
to process items concurrently. An event is dispatched for each item that is processed.
The cli gets all events, but the flutter gui does not. Here are the dartpads:
cli: https://dartpad.dev/?id=4b0c9e9383f07910677e2865d31be1bc
gui: https://dartpad.dev/?id=3981c77fb0814eb6e311fb083431cc4d
What am I doing wrong?
Note: I posted to flutter discord and a kind user said, "I'm not sure, but maybe StreamBuilder
just doesn't react to every stream event. UI is made to be optimized, so two events per frame may just skip one, not sure"
It's an interesting thought that I figured I'd share in case it jogs someone's memory and can confirm. Of course either way, I'm hoping for a workaround/fix.
cli:
import 'dart:async';
import 'generator.dart';
Future<void> main() async {
final generator = Generator();
var stream = generator.start();
var eventCount = 0;
await for (final i in stream) {
print('eventCount: ${++eventCount}');
print('cli_main: $i');
}
}
flutter:
import 'package:flutter/foundation.dart';
import 'package:flutter/material.dart';
import 'generator.dart';
// See the console for which events are missing and other output.
void main() {
runApp(const MyApp());
}
class MyApp extends StatelessWidget {
const MyApp({super.key});
@override
Widget build(BuildContext context) {
return MaterialApp(
title: 'Flutter Demo',
theme: ThemeData(
primarySwatch: Colors.blue,
brightness: Brightness.dark,
),
home: const GeneratorScreen(),
);
}
}
class GeneratorScreen extends StatefulWidget {
const GeneratorScreen({Key? key}) : super(key: key);
@override
State<GeneratorScreen> createState() => _GeneratorScreenState();
}
class _GeneratorScreenState extends State<GeneratorScreen> {
var _eventCount = 0;
final _stream = Generator().start();
final _eventSet = <int>{};
Widget _showData(int event) {
_eventCount++;
if (!_eventSet.add(event)) {
print('!!! $event already in set !!!!');
}
print('_showData: $event');
print('_eventCount: $_eventCount');
return Row(
children: [
Text('event data: $event, event count: $_eventCount'),
Expanded(
child: LinearProgressIndicator(
value: clampDouble(_eventCount / 100.0, 0, 100),
),
),
],
);
}
@override
Widget build(BuildContext context) {
return Scaffold(
body: StreamBuilder(
stream: _stream,
builder: (BuildContext context, AsyncSnapshot<int?> snapshot) {
if (snapshot.hasData) {
final widget = _showData(snapshot.data!);
if (snapshot.connectionState == ConnectionState.done) {
final expected = [for (int i = 1; i <= 100; i++) i];
final missing = expected.where((i) => !_eventSet.contains(i));
print('_eventSet missing: $missing');
}
return widget;
} else if (snapshot.hasError) {
print('!!! ${snapshot.error} !!!');
return Text(snapshot.error.toString());
} else {
return const CircularProgressIndicator();
}
},
),
);
}
}
common:
import 'dart:async';
import 'dart:math';
import 'package:rxdart/rxdart.dart';
class Generator {
Stream<int> start() {
final eventStreamController = StreamController<int>();
unawaited(_doWork(eventStreamController.sink).whenComplete(
() {
eventStreamController.close();
},
));
return eventStreamController.stream;
}
Future<int> _doWork(StreamSink<int> eventSink) async {
final workItemStream = _workItemStream();
return await (workItemStream.flatMap(
// change to 1 and all events make it to UI, but much slower
// maxConcurrent: 1,
maxConcurrent: 10,
(workItem) async* {
yield (await _doWorkItem(workItem, eventSink));
},
).fold(0, (previous, element) => previous + element));
}
Future<int> _doWorkItem(int workItem, StreamSink<int> eventSink) async {
final ms = Random().nextInt(1500) + 250;
print('_doWorkItem: $workItem, ${ms}ms');
await Future.delayed(Duration(milliseconds: ms));
eventSink.add(workItem);
return workItem;
}
Stream<int> _workItemStream() async* {
var page = 0;
const pages = 5;
const perPage = 20;
while (page < pages) {
await Future.delayed(const Duration(milliseconds: 100));
var pageItems =
List<int>.generate(perPage, (i) => (page * perPage) + i + 1);
page++;
yield* Stream.fromIterable(pageItems);
}
}
}
As suggested, it turned out the answer essentially was that flutter couldn't keep up with the updates. I assumed that a StreamBuilder
would update the ui at each emit, but I think it's better described as marking the ui as needing an update at each emit. If ui is marked as needing an update multiple times before an update actually happens, then just one event will be used for the update.
The docs for StreamBuilder
state:
The builder is called at the discretion of the Flutter pipeline, and will thus receive a timing-dependent sub-sequence of the snapshots that represent the interaction with the stream.
My problem was depending on each event for state, but not realizing I may not see all events when using StreamBuilder
. I solved the repro by listening to the stream decoupled from ui updates (in initState), saving needed state to class variable, and creating a new stream to signal ui updates. Since class variables are being updated at every event, it no longer matters if the ui misses a few events - it will simply show the latest data at each update. (there may be better solutions)
updated dartpad: https://dartpad.dev/?id=f0f3c3a27a6766d9f1011bee918eef13
updated gui:
class _GeneratorScreenState extends State<GeneratorScreen> {
var _eventCount = 0;
final _stream = Generator().start();
final _uiStreamController = StreamController<int>();
late final Stream<int> _uiStream = _uiStreamController.stream;
final _eventSet = <int>{};
@override
void initState() {
super.initState();
unawaited(_uiListen());
}
Future<void> _uiListen() async {
await for (final event in _stream) {
_eventCount++;
if (!_eventSet.add(event)) {
print('!!! $event already in set !!!!');
}
print('_showData: $event');
print('_eventCount: $_eventCount');
_uiStreamController.add(event);
}
_uiStreamController.close();
}
Widget _showData(int event) {
return Row(
children: [
Text('event data: $event, event count: $_eventCount'),
Expanded(
child: LinearProgressIndicator(
value: clampDouble(_eventCount / 100.0, 0, 100),
),
),
],
);
}
@override
Widget build(BuildContext context) {
return Scaffold(
body: StreamBuilder(
stream: _uiStream,
builder: (BuildContext context, AsyncSnapshot<int?> snapshot) {
if (snapshot.hasData) {
final widget = _showData(snapshot.data!);
if (snapshot.connectionState == ConnectionState.done) {
final expected = [for (int i = 1; i <= 100; i++) i];
final missing = expected.where((i) => !_eventSet.contains(i));
print('_eventSet missing: $missing');
}
return widget;
} else if (snapshot.hasError) {
print('!!! ${snapshot.error} !!!');
return Text(snapshot.error.toString());
} else {
return const CircularProgressIndicator();
}
},
),
);
}
}