Search code examples
dartobservablereactivex

Observable with Stateful Lifecycle


The general design problem can be described as:

I have a websocket connection that has a strict lifecycle to respect—it wants connect and disconnect to be called appropriately, and, because it talks to the system, it uses . Within this websocket connection, we have multiple different Subscription objects, each with a strict lifecycle that it wants to be respected (subscribe and unsubscribe), and it depends on the state of its parent websocket for those operations to be successful.

Here's a timeline of the ideal behavior for three nested lifecycle observables, where C depends on B which depends on A:


A = someInput.switchMap((i) => LifecycleObservable())
B = A.switchMap((a) => LifecycleObservable())
C = B.switchMap((b) => LifecycleObservable())

C.listen(print);

// <-- listen to c
// <-- produce [someInput]
setup A
setup B
setup C
// <-- c is produced

// <-- c is unsubscribed
teardown C
teardown B
teardown A

// <-- C is re-subscribed-to
setup A
setup B
setup C

// <-- produce [someInput]
teardown C
teardown B
teardown A
setup A
setup B
setup C
// <-- c is produced

First question: Is this an anti-pattern? I haven't been able to find much about this pattern on the web, but it seems like a pretty standard sort of thing you'd run into with observables: some objects just have a lifecycle and some objects might want to depend on that.

I can get pretty close to this ideal behavior using something like this:

class LifecycleObservable {
  static Observable<T> fromObservable<T>({
    @required Observable<T> input,
    @required Future<void> Function(T) setup,
    @required Future<void> Function(T) teardown,
  }) {
    return input.asyncMap((T _input) async {
      await setup(_input);
      return _input;
    }).switchMap((T _input) {
      return Observable<T>(Observable.never()) //
          .startWith(_input)
          .doOnCancel(() async {
        await teardown(_input);
      });
    });
  }
}

This code accepts a stream of stateful objects, running setup on them as they're produced and teardown on them as the sub-observable within the switchMap is cancelled.

The problem occurs when, in the original idealized timeline, the second [someInput] is produced: using the code above I get a callgraph like

// <-- listen to c
// <-- produce [someInput]
setup A
setup B
setup C
// <-- c is produced

// <-- produce [someInput]
teardown A
setup A
teardown B
setup B
teardown C
setup C
// <-- c is produced

the problem being that if B depends on A (like calling unsubscribe from a subscription that depends on an open websocket transport), this teardown order breaks the expected lifecycle of each object (the subscription tries to send unsubscribe over a closed websocket transport.


Solution

  • It seems to me that, quite simply, the observable pattern cannot express these semantics. Specifically, the observable pattern is not designed for cascading dependencies—parent observables know nothing about the state of their child observables.

    I solved this problem for myself with the following dart code. I'm sure it's terrible, but it seems to work for me in general™.

    
    class WithLifecycle<T> {
      final FutureOr<void> Function() setup;
      final FutureOr<void> Function() teardown;
      final T value;
      final WithLifecycle parent;
    
      List<WithLifecycle> _children = [];
      bool _disposed = false;
      WithLifecycle({
        @required this.value,
        this.setup,
        this.teardown,
        this.parent,
      });
    
      void addDependency(WithLifecycle child) => _children.add(child);
      void removeDependency(WithLifecycle child) => _children.remove(child);
      Future<void> init() async {
        parent?.addDependency(this);
        await setup();
      }
    
      Future<void> dispose() async {
        if (_disposed) {
          return;
        }
    
        _disposed = true;
        for (var _child in _children) {
          await _child.dispose();
        }
        _children.clear();
        await teardown();
      }
    }
    

    which is then used to create the necessary dependency chain when using observables:

    class LifecycleObservable {
      static Observable<WithLifecycle<T>> fromObservable<T>({
        @required Observable<T> value,
        WithLifecycle parent,
        @required Future<void> Function(T) setup,
        @required Future<void> Function(T) teardown,
      }) {
        return value.concatMap((T _value) {
          final withLifecycle = WithLifecycle<T>(
            value: _value,
            parent: parent,
            setup: () => setup(_value),
            teardown: () => teardown(_value),
          );
          return Observable<WithLifecycle<T>>(Observable.never())
              .startWith(withLifecycle)
              .doOnListen(() async {
            await withLifecycle.init();
          }).doOnCancel(() async {
            await withLifecycle.dispose();
          });
        });
      }
    }
    

    which is used like

    token$ = PublishSubject();
        channel$ = token$.switchMap((token) {
          return LifecycleObservable.fromObservable<IOWebSocketChannel>(
              value: Observable.just(IOWebSocketChannel.connect(Constants.connectionString)),
              setup: (channel) async {
                print("setup A ${channel.hashCode}");
              },
              teardown: (channel) async {
                print("teardown A ${channel.hashCode}");
                await channel.sink.close(status.goingAway);
              });
        });
    
        streams$ = channel$.switchMap((channel) {
          return LifecycleObservable.fromObservable<Stream<String>>(
            parent: channel,
            value: Observable.just(channel.value.stream.cast<String>()),
            setup: (thing) async {
              print("setup B ${thing.hashCode}");
            },
            teardown: (thing) async {
              print("teardown B ${thing.hashCode}");
            },
          );
        });
    
        messages = streams$.flatMap((i) => i.value).share();
    
    

    and ends up with a call graph like the below

    // <- push [token]
    flutter: setup A 253354366
    flutter: setup B 422603720
    // <- push [token]
    flutter: teardown B 422603720
    flutter: teardown A 253354366
    flutter: setup A 260164938
    flutter: setup B 161253018