Search code examples
flutterdartdart-isolates

How to break up Dart Isolate code to avoid blocking the event queue?


I am writing a test program to explore the use of Isolates in Dart/Flutter. One type of isolate that I have is started and stopped using a switch on a Flutter UI. This sends a message (START and STOP) to the isolate and I use a ValueNotifier to detect these commands and respond to them. When I initially wrote this, the Isolate ran continuously and didn’t respond the to the STOP command, which I understand is because the event queue would never be empty to process it.

Based on the first answer to this thread... How to terminate a long running isolate #2

… and the suggested approach on this blog page: https://hackernoon.com/executing-heavy-tasks-without-blocking-the-main-thread-on-flutter-6mx31lh

… I have tried to break my code up using Futures. I have split my run block into runWorker() and runChunk() functions, with runChunk called repeatedly as long as the Isolate should be running (run == true). I am doing something wrong because the Isolate still runs away and does not process the STOP command. I get slightly different results depending on whether I call runChunk directly or using Future.delayed (as per the hackernoon blog page), but neither approach works.

I believe that the STOP code works because if I remove the processing loop then everything triggers as expected, and I had an earlier version of this that worked when I included a 'Future.delayed' of 1 microsecond between each counter loop. So assume I am just using Futures incorrectly and not freeing up the event queue between 'runChunk' calls.

Can anyone tell me what I am doing wrong here? Here is the code for my isolate...

import 'dart:async';
import 'dart:isolate';

import 'package:flutter/material.dart';

class ContinuousIsolator {
  ContinuousIsolator(
      {required int channel, required void Function(double) setCounter}) {
    print('Isolator initialisation');
    _channel = channel;
    _setCounter = setCounter;
    spawn();
  }
  late int _channel; // The id of this isolate
  late void Function(double) _setCounter;
  late SendPort _port;
  late Isolate _isolate;
  ReceivePort receivePort = ReceivePort();

  // Spawn a new isolate to complete the countdown (or up)
  // channel = the number of the isolate
  // counter = the value to count down from (or up to)
  void spawn() {
    print('Isolator establishing receiver');
    receivePort.listen((msg) {
      // print('Isolator message received');

      // Unpack the map from the returned string
      Map<int, dynamic> map = Map<int, dynamic>.from(msg);

      // There should be only one key:value pair
      for (var key in map.keys) {
        msg = map[key]; // Extract the message
      }
      // print('Channel $_channel received "$msg" of type ${msg.runtimeType}');

      // If we have received a Sendport, then add it to the port map
      if (msg is SendPort) {
        _port = msg;
      } else {
        // Otherwise process the message
        // If it contains 'END' then we need to terminate the isolate
        switch (msg) {
          case 'END':
            _isolate.kill();
            // Isolate has completed, then close this receiver port
            receivePort.close();
            break;
          default:
            _setCounter(msg); // Send message to display
            break;
        }
      }
    });

    // Start the isolate then let's get working on the countdown timer
    Isolate.spawn(worker, {_channel: receivePort.sendPort}).then((isolate) {
      _isolate = isolate; // Capture isolate so we can kill it later
    });
  }

  void run() {
    print('Sending START to worker');
    _port.send('START');
  }

  void stop() {
    print('Sending STOP to worker');
    _port.send('STOP');
  }

  void end() {
    _port.send('END'); // Send counter value to start countdown
  }
}

void worker(Map<int, dynamic> args) {
  int? id; // Number for this channel
  ReceivePort receivePort = ReceivePort(); // Receive port for
  SendPort? sendPort;
  ValueNotifier<String> message = ValueNotifier('');
  const double start = 10000000;
  double counter = start;
  const int chunkSize = 1000;
  bool down = true;
  bool run = true;

  // Unpack the args to get the id and sendPort.
  // There should be only one key:value pair
  dynamic msg = '';
  Map<int, dynamic> map = Map<int, dynamic>.from(args);
  for (var key in map.keys) {
    id = key; // Extract the isolate id
    msg = map[key]; // Extract the message
  }
  // First message should contain the receivePort for the main isolate
  if (msg is SendPort) {
    sendPort = msg;
    // print('args: $args    port: $sendPort');
    print('worker $id sending send port');
    sendPort.send({id: receivePort.sendPort});
  }

  double getCounter() {
    return counter;
  }

  void setCounter(double value) {
    counter = value;
  }

  bool getDown() {
    return down;
  }

  void setDown(bool value) {
    down = value;
  }

  Future runChunk(
      int chunkSize,
      bool Function() getDown,
      void Function(bool) setDown,
      double Function() getCounter,
      void Function(double) setCounter) {
    const double start = 10000000;

    print('Running chunk, counter is ${getCounter()}');
    for (int i = 0; i < chunkSize; i++) {
      // print('Worker $id in the while loop');
      if (getDown() == true) {
        setCounter(getCounter() - 1);
        if (getCounter() < 0) setDown(!getDown());
      } else {
        setCounter(getCounter() + 1);
        if (getCounter() > start) setDown(!getDown());
      }
      if ((getCounter() ~/ 1000) == getCounter() / 1000) {
        // print('Continuous Counter is ${getCounter()}');
        sendPort!.send({id: getCounter()});
      } // Send the receive port
    }
    return Future.value();
  }

  void changeMessage() {
    print('Message has changed to ${message.value}');
    if (message.value == 'START') {
      run = true;
    } else {
      run = false;
    }
  }

  void runWorker() async {
    message.addListener(changeMessage);

    print('Worker running counter down from $counter');
    while (run == true) {
      // This line appears to run the isolate, but there is no output/feedback to the GUI
      // The STOP command does not interrupt operation.
      Future.delayed(const Duration(microseconds: 0),
          () => runChunk(chunkSize, getDown, setDown, getCounter, setCounter));
      // This line runs the isolate with feedback to the GUI
      // The STOP command does not interrupt operation.
      runChunk(chunkSize, getDown, setDown, getCounter, setCounter);
    }

    message.removeListener(changeMessage);
  }

  // Establish listener for messages from the controller
  print('worker $id establishing listener');
  receivePort.listen((msg) {
    print('worker $id has received $msg');
    switch (msg) {
      case 'START':
        print('Worker $id starting run');
        message.value = msg;
        runWorker();
        break;
      case 'STOP':
        print('Worker $id stopping run');
        message.value = msg;
        break;
      case 'END':
        message.removeListener(changeMessage);
        receivePort.close;
        break;
      default:
        break;
    }
  });
}

Solution

  • Ok, so it turns out that the problem was not with the Futures, but with this line of code in the runWorker() method:

    while (run == true) {

    ... which (I think) was blocking the event queue.

    I also think I was over-thinking things using a ValueNotifier (although I don't think there is any reason why this wouldn’t work).

    I have simplified my code so that the main computation method (runChunk) checks if it should still be running after each chunk, and if the answer is yes then it just calls itself again to run the next chunk.

    The run flag is set to true and runChunk called when START is received from the parent and the run flag is set to false when STOP is received. This removes the need for the ValueNotifier.

    Everything works as expected now and the isolate computation (‘runChunk’) can be started and stopped on demand.

    Here is the revised code (the print lines are debug lines that I have left in):

    import 'dart:async';
    import 'dart:isolate';
    
    class ContinuousIsolator {
      ContinuousIsolator(
          {required int channel, required void Function(double) setCounter}) {
        print('Isolator initialisation');
        _channel = channel;
        _setCounter = setCounter;
        spawn();
      }
      late int _channel; // The id of this isolate
      late void Function(double)
          _setCounter; // Callback function to set on-screen counter
      late SendPort _port; // SendPort of child isolate to communicate with
      late Isolate _isolate; // Pointer to child isolate
      ReceivePort receivePort = ReceivePort(); // RecevierPort for this class
    
      // Spawn a new isolate to complete the countdown (or up)
      // channel = the number of the isolate
      // counter = the value to count down from (or up to)
      void spawn() {
        print('Isolator establishing receiver');
        // Establish a listener for messages from the child
        receivePort.listen((msg) {
          // Unpack the map from the returned string (child sends a single map
          // contained key: isolate id and value: message)
          Map<int, dynamic> map = Map<int, dynamic>.from(msg);
    
          // There should be only one key:value pair received
          for (var key in map.keys) {
            msg = map[key]; // Extract the message
          }
    
          // If we have received a Sendport, then capture it to communicate with
          if (msg is SendPort) {
            _port = msg;
          } else {
            // Otherwise process the message
            // If it contains 'END' then we need to terminate the isolate
            switch (msg) {
              case 'END':
                _isolate.kill();
                // Isolate has completed, then close this receiver port
                receivePort.close();
                break;
              default:
                _setCounter(msg); // Send message to display
                break;
            }
          }
        });
    
        // Start the child isolate
        Isolate.spawn(worker, {_channel: receivePort.sendPort}).then((isolate) {
          _isolate = isolate; // Capture isolate so we can kill it later
        });
      }
    
      // Class method to start the child isolate doing work (countdown timer)
      void run() {
        print('Sending START to worker');
        _port.send('START');
      }
    
      // Class method to stop the child isolate doing work (countdown timer)
      void stop() {
        print('Sending STOP to worker');
        _port.send('STOP');
      }
    
      // Class method to tell the child isolate to self-terminate
      void end() {
        _port.send('END'); // Send counter value to start countdown
      }
    }
    
    // Child isolate function that is spawned by the parent class ContinuousIsolator
    // Called initially with single map of key: 'unique channel id' and value:
    // receiver port from the parent
    void worker(Map<int, dynamic> args) {
      int? id; // Unique id number for this channel
      ReceivePort receivePort = ReceivePort(); // Receive port for this isolate
      SendPort? sendPort; // Send port to communicate with the parent
      const double start = 10000000; // Starting counter value
      double counter = start; // The counter
      const int chunkSize =
          100; // The number of counter decrements/increments to process per 'chunk'
      bool down = true; // Flag to show is counting 'down' (true) or 'up' (false)
      bool run = false; // Flag to show if the isolate is running the computation
    
      // Unpack the initial args to get the id and sendPort.
      // There should be only one key:value pair
      dynamic msg = '';
      Map<int, dynamic> map = Map<int, dynamic>.from(args);
      for (var key in map.keys) {
        id = key; // Extract the isolate id
        msg = map[key]; // Extract the message
      }
      // The first message should contain the receivePort for the main isolate
      if (msg is SendPort) {
        sendPort = msg; // Capture sendport to communicate with the parent
        print('worker $id sending send port');
        // Send the receiver port for this isolate to the parent
        sendPort.send({id: receivePort.sendPort});
      }
    
      // Method to get the current counter value
      double getCounter() {
        return counter;
      }
    
      // Method to set the current counter value
      void setCounter(double value) {
        counter = value;
      }
    
      // Method to get the down flag value
      bool getDown() {
        return down;
      }
    
      // Method to set the down flag value
      void setDown(bool value) {
        down = value;
      }
    
      // This function does the main work of the isolate, ie the computation
      Future<void> runChunk(
          int chunkSize, // The number of loops to process for a given 'chunk'
          bool Function() getDown, // Callback to get bool down value
          void Function(bool) setDown, // Callback to set bool down value
          double Function() getCounter, // Call back to get current counter value
          void Function(double) setCounter) // Callback to set counter value
      async {
        const double start = 10000000; // Starting value for the counter
    
        // Count down (or up) the counter for chunkSize iterations
        for (int i = 0; i < chunkSize; i++) {
          // Counting down...
          if (getDown() == true) {
            setCounter(getCounter() - 1);
            // If reached zero, flip the counting up
            if (getCounter() < 0) setDown(!getDown());
          } else {
            // Counting up...
            setCounter(getCounter() + 1);
            // If reached start (max), flip the counting down
            if (getCounter() > start) setDown(!getDown());
          }
          // Update the display every 1000 points
          if ((getCounter() ~/ 1000) == getCounter() / 1000) {
            sendPort!.send({id: getCounter()}); // Notify parent of the new value
          }
        }
    
        // If the isolate is still running (parent hasn't sent 'STOP') then
        // call this function again to iterate another chunk.  This gives the event
        // queue a chance to process the 'STOP'command from the parent
        if (run == true) {
          // I assume Future.delayed adds call back onto the event queue
          Future.delayed(const Duration(microseconds: 0), () {
            runChunk(chunkSize, getDown, setDown, getCounter, setCounter);
          });
        }
      }
    
      // Establish listener for messages from the controller
      print('worker $id establishing listener');
      receivePort.listen((msg) {
        print('worker $id has received $msg');
        switch (msg) {
          case 'START':
            // Start the worker function running and set run = true
            print('Worker $id starting run');
            run = true;
            runChunk(chunkSize, getDown, setDown, getCounter, setCounter);
            break;
          case 'STOP':
            // Set run = false to stop the worker function
            print('Worker $id stopping run');
            run = false;
            break;
          case 'END':
            // Inform parent that isolate is shutting down
            sendPort?.send({id: msg});
            receivePort.close; // Close the receiver port
            break;
          default:
            break;
        }
      });
    }