Search code examples
rxjsreactivex

Is there an operator than I can use to "dispose" a resource allocated in a pipeline?


Say I have an observable:

declare function allocateLeakyThing(input: string): { dispose: () => void; };

const somethingLeaky$ = input$.pipe(
  map(allocateLeakyThing)
);

where allocateLeakyThing() returns a resource that has to be manually disposed by calling its dispose() method in order to prevent memory leak. If input$ emits a new value, I should dispose the previously allocated resource; if this observable is unsubscribed, it should dispose the resource too, if it's ever allocated.

For the first purpose I can write:

const somethingLeaky$ = input$.pipe(
  withLatestFrom(somethingLeaky$.pipe(startsWith(undefined))),
  tap(([input, somethingLeaky]) => somethingLeaky?.dispose()),
  map(([input])=>allocateLeakyThing(input))
);

What about the second purpose? Is there a way to do this inline with this observable (e.g. does not involve using another subject)?


Solution

  • You can use finalize() operator that is called when a chain is being disposed (unsubscribed).

    Since RxJS 7.3. you can also use tap({ finalize: () => {} }).

    Maybe in your situation you could also wrap the logic that creates the ressurce with Observable (basically the same thing as in Is there a way to make a destructor for RXJS Observables?):

    new Observable(observer => {
      // create resource
      // ...
    
      // Return teardown function that will be called when the Observable is unsubscribed.
      return () => resource.close();
    });