Search code examples
javascriptstreamrxjsreactive-programmingreactivex

rxjs: use stream referring twice to an observable that has already completed upstream


I'm having problems with getting a value from a hot observable after it's been used.

const partialState$ = Rx.Observable.of(1)
.publish().refCount();

const downstreamState$ = partialState$.map(v => ([v+1, v+2]));

const finalState$ = downstreamState$
// last value of partialState$ being requested -
// shouldn't matter if its completed through earlier chaining
.withLatestFrom(partialState$)
  
finalState$.subscribe(state => {  // this does not fire
	console.log('state', state);
});
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.0-rc.5/Rx.js"></script>

Here's the fiddle: http://jsfiddle.net/j64te6jp/10/

The code above does nothing - it seems like due to synchronous behaviour of subscribe, partialState$ isn't marked as having a value ready until pushed into .subscribe which never happens because no value exists yet for partialState$ when downStreamState$ requests the latest from it.

Could someone please explain why is this happening and also provide a good method of re-using the same value from a hot (published) observable. Thanks.


Solution

  • Here's the solutions suggested to me - many thanks to https://github.com/Dorus

    const partialState$ = Rx.Observable.of(1)
    .do(() => console.log('i will be run once'))
    
    const finalState$ = partialState$.publish(partialState_ => {
    	const downstreamState$ = partialState_.map(v => ([v+1, v+2]));
    	return downstreamState$
    		.withLatestFrom(partialState_)
    })
      
    finalState$.subscribe(state => {
    	console.log('state', state);
    });
    <script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.0-rc.5/Rx.js"></script>