Search code examples
flutterdartflutter-provider

How to pause and resume StreamProvider in Flutter


I am experimenting on forex live update using StreamProvider. The demo will auto-update the exchange rate by fetching latest data from external API periodically. (every 60 seconds in this example)

Below is the diagram of implementation.

Diagram

API call (Future event) --> Put data in stream
      ^                             |
      |                             V              
Wait for 60 seconds <--  StreamProvider listens for 
                        new event and rebuild widget

Problem

The stream continues even when navigating to main view.

If we use StreamBuilder, we may be able to call listen() method, which will return StreamSubscription. Then, either cancel(), pause(), or resume() method can be called on demand. I wonder if there is similar method to pause and resume while using StreamProvider?

Expected

pause when leaving dashboard view and resume when return to dashboard view.

Codes

Model


class Currency {
  String? base;
  String? quote;
  double? rate;

  // constructor, factory constructor, etc.
  // ...
}

Controller

class CurrencyService {
  Currency? _currency;

  Stream<Currency?> get currencyStream async* {
    yield* Stream.periodic(Duration(seconds: 60), (_) {
      return getCurrencyData();
    }).asyncMap((event) async => await event);
  }

  Future<Currency?> getCurrencyData() async {
    try {
      // Perform API call and 
      // update Currency object
      // ...
      
    } catch (e) {
      print('Error: $e');
    }
    return _currency;
  }
}

View

void main() async {
  runApp(
    MultiProvider(
      providers: [
        // some providers,
        // another one,
        // ...
        StreamProvider<Currency?>(
          create: (_) => CurrencyService().currencyStream,
          initialData: await CurrencyService().getCurrencyData(),
        ),
      ],
      child: TestApp(),
    ),
  );
}

class TestApp extends StatelessWidget {
  @override
  Widget build(BuildContext context) {
    return MaterialApp(
      title: 'Live Update Demo',
      initialRoute: '/',
      routes: routes,
    );
  }
}

Main view (page 1)

class MainView extends StatefulWidget {
  const MainView({Key? key}) : super(key: key);

  @override
  _MainViewState createState() => _MainViewState();
}

class _MainViewState extends State<MainView> {

  // ...

  @override
  Widget build(BuildContext context) {
    return Scaffold(
      body: Column(
        children: <Widget>[
          // ...
          ElevatedButton(
            onPressed: () {
              Navigator.pushNamed(context, '/dashboard');
            },
            child: Text('Dashboard')),
        ],
      ),
    );
  }
}

Dashboard view (page 2)

class DashboardView extends StatelessWidget {
  const DashboardView({Key? key}) : super(key: key);

  @override
  Widget build(BuildContext context) {
    return Scaffold(body: Consumer<Currency?>(
      builder: (context, currency, child) {
        return Center(
          child: Column(
            mainAxisAlignment: MainAxisAlignment.center,
            children: [
              Container(
                child: Text('${currency?.base ?? ''}${currency?.quote ?? ''}'),
              ),
              Container(
                child: Text('${currency?.rate ?? ''}'),
              ),
            ],
          ),
        );
      },
    ));
  }
}

Thank you.


Solution

  • Pausing and resuming StreamProvider on Stream.periodic don't seem possible. Instead, the implementation can still be achieved using Timer.periodic and StreamController, as suggested by @Abion47

    We can simulate the pause and resume by controlling when to start and stop adding new data to stream. One of the ways is to start the Timer.periodic when navigating to dashboard view (after a button is pressed) and cancel the timer when returning to main view (dashboard view is popped up).

    ElevatedButton(
      onPressed: () {
        // start timer
        // ...
        Navigator.pushNamed(...).then((_) { 
          // stop timer
          // this section is triggered when returning from dashboard to main view
        });
    }
    

    Revised codes

    // Controller
    
    class CurrencyService {
      Currency? _currency;
      Timer? _pollingTimer;
      StreamController<Currency?> _currencyController = StreamController.broadcast();
    
      Future<void> addCurrencyData() async {
        await getCurrencyData()
            .then((currency) => _currencyController.add(currency));
      }
    
      void closeStream() {
        _currencyController.close();
      }
    
      void startPolling() {
        addCurrencyData();
        _pollingTimer = Timer.periodic(Duration(seconds: 60), (_) => addCurrencyData());
      }
    
      void stopPolling() {
        _pollingTimer?.cancel();
      }
    
      Stream<Currency?> get currencyStream => _currencyController.stream;
    
      Future<Currency?> getCurrencyData() async {
        try {
          // Perform API call and 
          // update Currency object
          // ...
          
        } catch (e) {
          print('Error: $e');
        }
        return _currency;
      }
    }
    
    // Main
    
    void main() async {
      runApp(
        MultiProvider(
          providers: [
            // some providers,
            // another one,
            // ...
            Provider(create: (_) => CurrencyService()),
          ],
          child: TestApp(),
        ),
      );
    }
    
    class TestApp extends StatelessWidget {
      @override
      Widget build(BuildContext context) {
        return MaterialApp(
          title: 'Live Update Demo',
          initialRoute: '/',
          routes: routes,
        );
      }
    }
    
    // Main view (page 1)
    
    class MainView extends StatelessWidget {
      const MainView({Key? key}) : super(key: key);
    
      // ...
    
      @override
      Widget build(BuildContext context) {
        return Scaffold(
          body: Column(
            children: <Widget>[
              // ...
              ElevatedButton(
                onPressed: () {
                  Provider.of<CurrencyService>(context, listen: false)
                    .startPolling();
                  Navigator.pushNamed(
                    context, 
                    '/dashboard',
                  ).then((_) => Provider.of<CurrencyService>(context, listen: false).stopPolling());
                },
                child: Text('Dashboard')),
            ],
          ),
        );
      }
    }
    
    // Dashboard view (page 2)
    
    class DashboardView extends StatelessWidget {
      const DashboardView({Key? key}) : super(key: key);
    
      @override
      Widget build(BuildContext context) {
        final currencyService = Provider.of<CurrencyService>(context);
        return Scaffold(
          body: StreamProvider<Currency?>.value(
            initialData: null,
            value: currencyService.currencyStream,
            child: CurrencyRate(),
          ),
        );
      }
    }
    
    
    class CurrencyRate extends StatelessWidget {
      const CurrencyRate({Key? key}) : super(key: key);
    
      @override
      Widget build(BuildContext context) {
        final currency = context.watch<Currency?>();
        return Center(
          child: currency == null
            ? CircularProgressIndicator()
            : Column(
                mainAxisAlignment: MainAxisAlignment.center,
                  children: [
                    Container(
                      child: Text('${currency?.base ?? ''}${currency?.quote ?? ''}'),
                    ),
                    Container(
                      child: Text('${currency?.rate ?? ''}'),
                    ),
                  ],
                ),
             );   
          } 
    }