Search code examples
angulartypescriptrxjsgoogle-api-clientrxjs6

Cancel concurrency HTTP requests after unsubscription


I have to following problem:

Many API calls go through an API interface (Google API) and have to be limited in request per seconds/concurrency because of the Google API limitation.

I use a subject (sink/call pool), which manages all API requests with mergeMap and returns a result to another, piped subject.

Because API requests can unsubscribe before they finish, they shouldn't block my sink. So I have to stop the API request (task) after unsubscription.

The issue: I don't know how to capture this unsubscribed state correctly. What I currently do is overwriting subscribe and unsubscribe to catch this state. It works but it does not look to "rxjs"ish for me.

What could I improve it?

import {Observable, Subject, Subscription, Subscribable, EMPTY} from 'rxjs';
import {mergeMap, tap} from 'rxjs/operators';

function doHeavyRequest() {
    return new Observable(subscribe => {
        // Simulate delay.
        setTimeout(() => {
            subscribe.next(1);
            subscribe.complete();
        }, 1000);
    });
}

const sink = new Subject<[Subject<any>, number]>();

sink.pipe(
    mergeMap(([subject, id]) => {
        // Stop request here if already unsubscribed.
        if (subject.closed) {
            console.log('Request cancelled:', id);
            return EMPTY;
        }
        return doHeavyRequest()
            .pipe(
                tap(res => {
                    if (!subject.closed) {
                        subject.next(res);
                        subject.complete();
                    } else {
                        console.log('Request aborted:', id);
                    }
                })
            );
    }, 2)
).subscribe();

// Insert request into sink.
// Overwrite subscribe and unsubscribe.
// Track unsubscribe over the flag alive.
function getSomething(id: number) {
    const task = new Subject();

    const ob = task.asObservable();

    ob.subscribe = (...args: any[]) => {
        const sub = Observable.prototype.subscribe.call(ob, ...args);
        sub.unsubscribe = () => {
            if (!task.isStopped)
                task.unsubscribe();
            Subscription.prototype.unsubscribe.call(sub);
    };
        return sub;
    };

    sink.next([task, id]);

    return ob;
}

// Make 3 requests and unsubscribe.
export function test() {
    const ob0 = getSomething(0);
    const ob1 = getSomething(1);
    const ob2 = getSomething(2);

    const sub0 = ob0.subscribe(e => {
        console.log('0:', e);
    });
    setTimeout(() => sub0.unsubscribe(), 1500);

    const sub1 = ob1.subscribe(e => {
        console.log('1:', e);
    });
    setTimeout(() => sub1.unsubscribe(), 900);

    const sub2 = ob2.subscribe(e => {
        console.log('2:', e);
    });
    setTimeout(() => sub2.unsubscribe(), 100);
}

See the test.ts at plunker and the console output:

https://next.plnkr.co/edit/KREjMprTrjHu2zMI?preview


Solution

  • Thanks to @Badashi, using finalize worked and looks much better:

    import {Observable, Subject, Subscription, Subscribable, EMPTY} from 'rxjs';
    import {mergeMap, tap, finalize} from 'rxjs/operators';
    
    function doHeavyRequest() {
        return new Observable(subscribe => {
            // Simulate delay.
            setTimeout(() => {
                subscribe.next(1);
                subscribe.complete();
            }, 1000);
        });
    }
    
    const sink = new Subject<[Subject<any>, number]>();
    
    sink.pipe(
        mergeMap(([subject, id]) => {
            // Stop request here if already unsubscribed.
            if (subject.closed) {
                console.log('Request cancelled:', id);
                return EMPTY;
            }
            return doHeavyRequest()
                .pipe(
                    tap(res => {
                        if (!subject.closed) {
                            subject.next(res);
                            subject.complete();
                        } else { 
                            console.log('Request aborted:', id);
                        }
                    })
                );
        }, 2)
    ).subscribe();
    
    // Insert request into sink.
    // Overwrite subscribe and unsubscribe.
    // Track unsubscribe.
    function getSomething(id: number) {
        const task = new Subject();
        const ob = task.pipe(finalize(() => {
            if (!task.isStopped) {
                task.unsubscribe();
            }
        }));
    
        sink.next([task, id]);
    
        return ob;
    }
    
    // Make 3 requests and unsubscribe.
    export function test() {
        const ob0 = getSomething(0);
        const ob1 = getSomething(1);
        const ob2 = getSomething(2);
    
        const sub0 = ob0.subscribe(e => {
            console.log('0:', e);
        });
        setTimeout(() => sub0.unsubscribe(), 1500);
    
        const sub1 = ob1.subscribe(e => {
            console.log('1:', e);
        });
        setTimeout(() => sub1.unsubscribe(), 900);
    
        const sub2 = ob2.subscribe(e => {
            console.log('2:', e);
        });
        setTimeout(() => sub2.unsubscribe(), 100);
    }
    

    Output:

    0: 1
    Request cancelled: 2
    Request aborted: 1