Say I have an observable:
declare function allocateLeakyThing(input: string): { dispose: () => void; };
const somethingLeaky$ = input$.pipe(
map(allocateLeakyThing)
);
where allocateLeakyThing()
returns a resource that has to be manually disposed by calling its dispose()
method in order to prevent memory leak. If input$
emits a new value, I should dispose the previously allocated resource; if this observable is unsubscribed, it should dispose the resource too, if it's ever allocated.
For the first purpose I can write:
const somethingLeaky$ = input$.pipe(
withLatestFrom(somethingLeaky$.pipe(startsWith(undefined))),
tap(([input, somethingLeaky]) => somethingLeaky?.dispose()),
map(([input])=>allocateLeakyThing(input))
);
What about the second purpose? Is there a way to do this inline with this observable (e.g. does not involve using another subject)?
You can use finalize()
operator that is called when a chain is being disposed (unsubscribed).
Since RxJS 7.3. you can also use tap({ finalize: () => {} })
.
Maybe in your situation you could also wrap the logic that creates the ressurce with Observable
(basically the same thing as in Is there a way to make a destructor for RXJS Observables?):
new Observable(observer => {
// create resource
// ...
// Return teardown function that will be called when the Observable is unsubscribed.
return () => resource.close();
});