Search code examples
javascriptrxjsfrpreactive-extensions-js

How to create an Observable that only fires when it has subscribers, and provides the latest value to new subscribers immediately


I'm trying to create a stream/observable that...

  1. Only outputs events when it has subscribers
  2. Provides any new subscribers with the latest value.

The concrete case is that I need an observable that makes an Async API call whenever a particular event happens, but only if it has subscribers. I'm trying to avoid unnecessary API calls.

I've managed to create a stream that only fires when it has subscribers like this...

let dataStream = Rx.Observable
   .interval(1000) // Fire an event every second
   .singleInstance() // Only do something when we have subscribers
   .startWith(null) // kick start as soon as something subscribes
   .flatMapLatest(interval => SomeAPI.someDataGet()) // get data, returns a promise

And this works. If I console.log(...) in the SomeAPI.someDataGet method, I only see it firing when the stream has subscribers. And my implementation looks really nice because I do this to subscribe and unsubscribe which fits in very nicely with React component lifecycle methods.

let sub1;
sub1 = dataStream.subscribe(x => console.log('sub1', x));
sub1.dispose();

I also want any new subscribers to receive the latest value the instant they subscribe. This is where I'm struggling. If I do this...

let sub1, sub2;
sub1 = dataStream.subscribe(x => console.log('sub1', x));

setTimeout( () => {
    sub2 = dataStream.subscribe(x => console.log('sub2', x));
}, 1500)

...I don't see the console.log for sub2 until the next interval.

If my understanding is correct. I need a Hot Observable. So I have tried to create a stream like this...

let dataStream = Rx.Observable
   .interval(1000) // Fire an event every second
   .singleInstance() // Only do something when we have subscribers
   .startWith(null) // kick start as soon as something subscribes
   .flatMapLatest(interval => SomeAPI.someDataGet()) // get data
   .publish() // Make this a hot observable;

Which as I understand it, should make dataStream a hot observable.

However, in my tests the second subscription still doesn't receive data until the next interval. In addition, this would introduce the requirement to connect and disconnect the dataStream when subscribing which is something I would like to avoid if possible.

I'm brand new to RxJS and I would not be surprised if I've misunderstood what's happening here.


Solution

  • Instead of .publish(), use .shareReplay(1).