Search code examples
rxjsangular6observableangularfire2

Timeout after n-seconds if a certain value is not emitted


I am using angularfire2 to push in Firebase on object with a status NEW. My backend is listening to writes on that list and will take actions on each new request with status NEW. I want to handle 3 possible outcomes: SUCCESS, ERROR and timeout.

add(book: Book) {
        return this.authentication.user.pipe(
            take(1),
            switchMap(user => {
                // Set owner for the backend to handle correctly
                book.setOwner(user.uid);

                // Add book request
                const queueRef = this._afqueue.list(this.ADD_BOOK_QUEUE_PATH);
                const pushPromise = queueRef.push({ status: { code: 'NEW' }, ...book })
                    .then(ref => {
                        console.log('Request to add a new book added to queue.');
                        return ref;
                    }) as Promise<any>;
                return from(pushPromise);
            }),
            switchMap(ref => {
                return this._afqueue.object(this.ADD_BOOK_QUEUE_PATH + '/' + ref.key)
                    .valueChanges();
            }),
            map(snap => snap['status']),
            filter(status => status['code'] === 'SUCCESS' || status['code'] === 'ERROR'),
            switchMap(status => {
                if (status['code'] === 'SUCCESS') {
                    return Observable.create(function(observer) {
                        observer.complete(status['book_id']);
                      });
                    //return status['book_id'];
                }
                else if (status['code'] === 'ERROR') {
                    throw(status['error']);
                }
            }),
            timeout(60000), // timeout after 60 secondes
        );
    }

Timeout occurs whether I receive an ERROR or a SUCCESS. How could I timeout only if I receive none after 60 secondes ?


Solution

  • I rewrote it this way, which is working:

    add(book: Book) {
            return this.authentication.user.pipe(
                take(1),
                mergeMap(user => {
                    // Set owner for the backend to handle correctly
                    book.setOwner(user.uid);
    
                    // Add book request
                    const queueRef = this._afqueue.list(this.ADD_BOOK_QUEUE_PATH);
                    const pushPromise = queueRef.push({ status: { code: 'NEW' }, ...book })
                        .then(ref => {
                            console.log('Request to add a new book added to queue.');
                            return ref;
                        }) as Promise<any>;
                    return from(pushPromise);
                }),
                mergeMap(ref => {
                    return this._afqueue.object(this.ADD_BOOK_QUEUE_PATH + '/' + ref.key)
                        .valueChanges();
                }),
                map(snap => snap['status']),
                filter(status => (status['code'] === 'SUCCESS' || status['code'] === 'ERROR')),
                first(),
                timeout(60000), // timeout after 60 secondes
                mergeMap(status => {
                    console.log(status);
                    console.log(status['code'])
                    if (status['code'] === 'SUCCESS') {
                        return of(status['book_id']);
                        //return status['book_id'];
                    }
                    else if (status['code'] === 'ERROR') {
                        throwError(status['error']);
                    }
                })
            );
        }