Search code examples
javascriptnode.jsrxjsstreamobservable

How to make rxjs v7 streams hot everywhere


I'm using RxJS v7.

I noticed when I create new observable from scratch or using map, scan, merge or other operators, the observable called multiple times unless using share if I use a variable holding it multiple times. Like below

let share_a = false
let share_b = false
if (process.argv[2] === 'share_a') {
  share_a = true
}
if (process.argv[3] === 'share_b') {
  share_b = true
}
console.log({ share_a, share_b })
const { map, merge, tap, share, Observable } = require('rxjs')
const sleep = ms => new Promise(resolve => setTimeout(resolve, ms))
let a = new Observable(async subscriber => {
  await sleep(100)
  console.log('a called')
  subscriber.next('a')
})
if (share_a) {
  a = a.pipe(share())
}
let b = a.pipe(map(() => 'b'), tap(x => console.log(`${x} called`)))
if (share_b) {
  b = b.pipe(share())
}
const c = merge(a, b).pipe(map(() => 'c'), tap(x => console.log(`${x} called`)))
merge(a,b,c).subscribe(console.log)
% node src/rxjsTest.js
{ share_a: false, share_b: false }
a called
a
a called
b called
b
a called
c called
c
a called
b called
c called
c
% node src/rxjsTest.js share_a
{ share_a: true, share_b: false }
a called
a
b called
b
c called
c
b called
c called
c
% node src/rxjsTest.js share_a share_b
{ share_a: true, share_b: true }
a called
a
b called
b
c called
c
c called
c
%

How can I make every observable shared without calling share every time?


Solution

  • There's no settings which would do this for you and it's pretty atypical to want this and it comes with serious performance implications.

    That being said, you an just create a new function/method to handle this for you.

    For example:

    Observable.prototype.pipeHot = function (...operators) {
      return this.pipe(
        ...operators,
        share()
      );
    }
    
    // Make **a** without mixing observables and promises
    let a = timer(100).pipeHot(
      map(_ => 'a'), 
      tap(v => console.log(`${v} called`))
    );
    
    let b = a.pipeHot(
      map(_ => 'b'), 
      tap(v => console.log(`${v} called`))
    );
    
    const c = merge(a, b).pipeHot(
      map(_ => 'c'), 
      tap(v => console.log(`${v} called`))
    );
    
    merge(a,b,c).subscribe(console.log);