Search code examples
dartstreamrxdart

rxdart BehaviorSubject unpredictable behavior


I start using BehaviorSubject class

because I need broadcast stream and

A special StreamController that captures the latest item that has been added to the controller, and emits that as the first item to any new listener.

The test code below:

import 'package:rxdart/rxdart.dart';

void main() {
  BehaviorSubject<int> subject = BehaviorSubject<int>.seeded(0);

  subject.stream.listen((a) => print("listener 1 : $a"));

  subject.add(1);
  subject.add(2);

  subject.stream.listen((a) => print("listener 2 : $a"));

  subject.add(3);

  subject.stream.listen((a) => print("listener 3 : $a"));

  subject.add(4);

  subject.close();
}

The result:

listener 1 : 0
listener 1 : 1
listener 2 : 2
listener 2 : 3
listener 3 : 3
listener 3 : 4
listener 1 : 2
listener 2 : 4
listener 1 : 3
listener 1 : 4
Exited

Although I need the behavior of the instant listens of the stored value of the streams, the result order is so unpredictable to me on what I code.

I need more specifications for the BehaviorSubject class but cannot find out why the stream behaves this way.

I'd like to know how this API works.


Solution

  • A BehaviorSubject is not really a broadcast stream. It's a multi-cast stream (multiple listeners), but they don't all get the same events at the same time like a broadcast stream would. (Since they get the latest event at time of listening).

    The most likely reason for the displayed behavior (I'm guessing, I haven't checked the BehaviorSubject implementation, I just happen to understand Dart streams very well) is that event delivery is asynchronous and events get enqueued.

    If you add a print("done"); after the subject.close();, it will print before any of the listener x: y lines. That's because all the code does until then is to schedule microtasks to later deliver events.

    It seems the BehaviorSubject schedules one microtask to deliver the first event to each listener (probably calling onData directly), then one further microtask the next time a value is added to the subject, which is where it gets added to the listener's event queue. After that, and adding more events to the queue won't trigger more microtasks. Instead the queue elements will be delivered one by one, requesting a new microtask after each one.

    That will give the following behavior (notation: M1:L2[3] means microtask M1 is scheduled to deliver event 3 to listener 2):

    +─────────────+───────────────────────────────────────────────────────────────────────────────+──────────────────+
    | action      | microtasks                                                                    | prints           |
    +─────────────+───────────────────────────────────────────────────────────────────────────────+──────────────────+
    | `listen` 1  | M1:L1[0]                                                                      |                  |
    | `add(1)`    | M1:L1[0],M2:L1[1]                                                             |                  |
    | `add(2)`    | M1:L1[0],M2:L1[1, 2]                                                          |                  |
    | `listen` 2  | M1:L1[0],M2:L1[1, 2],M3:L2[2]                                                 |                  |
    | `add(3)`    | M1:L1[0],M2:L1[1,2,3],M3:L2[2],M4:L2[3]                                       |                  |
    | `listen` 3  | M1:L1[0],M2:L1[1,2,3],M3:L2[2],M4:L2[3],M5:L3[3]                              |                  |
    | `add(4)`    | M1:L1[0],M2:L1[1,2,3,4],M3:L2[2],M4:L2[3,4],M5:L3[3],M6:L3[4]                 |                  |
    | `close`     | M1:L1[0],M2:L1[1,2,3,4,DONE],M3:L2[2],M4:L2[3,4,DONE],M5:L3[3],M6:L3[4,DONE]  |                  |
    | microtask   | M2:L1[1,2,3,4,DONE],M3:L2[2],M4:L2[3,4,DONE],M5:L3[3],M6:L3[4,DONE]           | `listener 1: 0`  |
    | microtask   | M3:L2[2],M4:L2[3,4,DONE],M5:L3[3],M6:L3[4,DONE],M7:L1[2,3,4,DONE]             | `listener 1: 1`  |
    | microtask   | M4:L2[3,4,DONE],M5:L3[3],M6:L3[4,DONE],M7:L1[2,3,4,DONE]                      | `listener 2: 2`  |
    | microtask   | M5:L3[3],M6:L3[4,DONE],M7:L1[2,3,4,DONE],M8:L2[4,DONE],                       | `listener 2: 3`  |
    | microtask   | M6:L3[4,DONE],M7:L1[2,3,4,DONE],M8:L2[4,DONE],                                | `listener 3: 3`  |
    | microtask   | M7:L1[2,3,4,DONE],M8:L2[4,DONE],M9:L3[DONE]                                   | `listener 3: 4`  |
    | microtask   | M8:L2[4,DONE],M9:L3[DONE],M10:L1[3,4,DONE]                                    | `listener 1: 2`  |
    | microtask   | M9:L3[DONE],M10:L1[3,4,DONE],M11:L2[DONE]                                     | `listener 2: 4`  |
    | microtask   | M10:L1[3,4,DONE],M11:L2[DONE]                                                 |                  |
    | microtask   | M11:L2[DONE],M12:L1[4,DONE]                                                   | `listener 1: 3   |
    | microtask   | M12:L1[4,DONE]                                                                |                  |
    | microtask   | M13:L1[DONE]                                                                  | `listener 1: 4   |
    | microtask   |                                                                               |                  |
    +─────────────+───────────────────────────────────────────────────────────────────────────────+──────────────────+
    

    You can see this matches your result. So, things work the way they do because of the way they're implemented, and because it's relying on an event enqueuing which puts itself at the back of the microtask queue after delivering each event, so the longest queue gets done last.

    You can explore the behavior yourself by printing when microtasks are scheduled and run:

    import 'dart:async';
    import 'package:rxdart/rxdart.dart';
    
    void main() {
      int ctr = 0;
      runZoned(() {
        BehaviorSubject<int> subject = BehaviorSubject<int>.seeded(0);
    
        print("listen 1");
    
        subject.stream.listen((a) => print("listener 1 : $a"), onDone: () {
          print('listener 1 done');
        });
        print("add 1");
        subject.add(1);
        print("add 2");
        subject.add(2);
        print("listen 2");
        subject.stream.listen((a) => print("listener 2 : $a"), onDone: () {
          print('listener 2 done');
        });
        print("add 3");
        subject.add(3);
        print("listen 3");
        subject.stream.listen((a) => print("listener 3 : $a"), onDone: () {
          print('listener 3 done');
        });
        print("add 4");
        subject.add(4);
        print("close");
        subject.close();
        print("done");
      }, zoneSpecification: ZoneSpecification(scheduleMicrotask: (s, p, z, f) {
        var id = ++ctr;
        print("Schedule microtask $id");
        p.scheduleMicrotask(z, () {
          print("Run microtask $id");
          f();
        });
      }));
    }
    

    which prints:

    listen 1
    Schedule microtask 1
    add 1
    Schedule microtask 2
    add 2
    listen 2
    Schedule microtask 3
    add 3
    Schedule microtask 4
    listen 3
    Schedule microtask 5
    add 4
    Schedule microtask 6
    close
    done
    Run microtask 1
    listener 1 : 0
    Run microtask 2
    listener 1 : 1
    Schedule microtask 7
    Run microtask 3
    listener 2 : 2
    Run microtask 4
    listener 2 : 3
    Schedule microtask 8
    Run microtask 5
    listener 3 : 3
    Run microtask 6
    listener 3 : 4
    Schedule microtask 9
    Run microtask 7
    listener 1 : 2
    Schedule microtask 10
    Run microtask 8
    listener 2 : 4
    Schedule microtask 11
    Run microtask 9
    Run microtask 10
    listener 1 : 3
    Schedule microtask 12
    Run microtask 11
    listener 3 done
    listener 2 done
    Run microtask 12
    listener 1 : 4
    Schedule microtask 13
    Run microtask 13
    Schedule microtask 14
    Run microtask 14
    listener 1 done
    

    (The "done" are delayed, probably because they need to wait for another future.)