Search code examples
javascripttypescriptrxjsrxjs5rxjs6

Rxjs issue with the concat operator when executing operations sequentially


I am using rxjs 6 and I am executing two async operations where the order is important.

I do have this chunk of code which works perfectly:

dbmsProxy.createDatastores().subscribe(() => {
    UsersDAO.insert(users).subscribe(() => {
        console.log('FINISHED ALL THE CHAIN');
    });
});

But when I try to use concat of rxjs I do have an issue because the second one is executed before the first one finishes:

concat([dbmsProxy.createDatastores(), UsersDAO.insert(users)]).subscribe();

Below the DBMSProxy methods

public createDatastores(): Observable<string> {
    const _this: DBMSProxy = this;
    const subject = new Subject<string>();
    const subscription: Subscription = UsersDAO.createDatastore().subscribe(
        onSuccess,
        onError,
        onFinally
    );
    return subject;

    function onSuccess(datastore: Nedb): void {
        console.log(`USERS Datastore Created Successfully`);
        _this.db.users = datastore;
        subject.next('success');
    }

    function onError(err: string) {
        subject.error('error');
        console.error(err);
    }

    function onFinally() {
        subject.complete();
        subscription.unsubscribe();
    }
}

public insertDocuments(documents: any, datastore: Nedb): Subject<any> {
    const subject = new Subject<any>();
    datastore.insert(documents, onInsert);
    return subject;

    function onInsert(err: Error, newDocuments: any) {
        if (err) {
            subject.error(err);
        } else {
            // add to the documents to insert the id just created from nedb when inserting the document
            documents.forEach((document: any, ind: number) => {
                document.id = newDocuments[ind]._id;
            });
            subject.next(documents);
        }
        subject.complete();
    }
}

And below the UsersDAO methods:

public static createDatastore(): Subject<Nedb | string> {
        const subject = new Subject<Nedb | string>();
        const datastore = new Nedb({
            filename: USERS_DATASTORE_FULL_NAME,
            autoload: true,
            onload
        });
        return subject;

        function onload(err: Error) {
            if (err) {
                subject.error(
                    `Error creating USERS datastore: ${err.name} - ${err.message}`
                );
            } else {
                subject.next(datastore);
            }
            subject.complete();
        }
    }

    public static insert(users: User[]): Observable<any> {
        return DBMSProxy.getInstance()
            .insertDocuments(users, DBMSProxy.getInstance().db.users)
            .pipe(catchError((val: any) => of('Error inserting the users')));
    }

Any idea of what's going on please?


Solution

  • My current solution is to convert the Subject to Observable, create a new Observable with the second one, and remove the square brackets (otherwise I will get back the observables and not the results) and this seems to work:

    const operations = concat(
        dbmsProxy.createDatastores().asObservable(),
        defer(() => UsersDAO.insert(users))
    );
    operations.subscribe(onSubscribe);
    function onSubscribe(result: any) {
        console.log('Finished all: ', result);
    }