In RxJS (ES6) I'm trying to get the result in a sequentially series of operations in a single Observable.
I'm not sure If I have to use a forkJoin (but I would like the operations to be executed sequentially) or a concat operator (but I would like to just be notified at the end when all of them are executed).
I TRIED:
forkJoin
sync(): Observable<any> {
return from(this.db.getProducts()).pipe(
map(products => {
if ( !products ) {
return of(true);
}
const batch: Observable<any>[] = [];
for ( const product of products ) {
if ( product.toBeSync ) {
batch.push(this.api.updateProduct(product));
}
}
return forkJoin(batch);
})
);
}
concat
sync(): Observable<any> {
return from(this.db.getProducts()).pipe(
map(products => {
if ( !products ) {
return of(true);
}
const batch: Observable<any>[] = [];
for ( const product of products ) {
if ( product.toBeSync ) {
batch.push(this.api.updateProduct(product));
}
}
return concat(batch);
})
);
}
In both cases I can't ever see the observables coming from the HttpClient
doing their job (no http request sent), but I can see the logging.
The called method in the batch it's the following:
updateProduct(product: Product) {
console.log('calling...');
const options = this.getDefaultRequestOptions();
return this.http.post('update_product', product, options);
}
I call the sync() function like the following:
this.productService.sync().subscribe(() => {
console.log('sync done');
}, error => this.utils.handleError(error));
Output:
(8) calling...
sync done
but no HTTP request starting.
If I do the same outside the forkJoin/concat in the pipe map I can see the HTTP request sent.
sync(): Observable<any> {
const product = new Product(null, 'Title', 'Desc.', 'CODE');
return this.api.updateProduct(product);
}
What am I missing?
--- UPDATE - SOLUTION ---
sync(): Observable<any> {
return from(this.db.getProducts()).pipe(
flatMap(products => {
if ( !products ) {
return of(true);
}
const batch: Observable<any>[] = [];
for ( const product of products ) {
if ( product.toBeSync ) {
batch.push(this.api.updateProduct(product));
}
}
console.log(batch);
return concat(...batch);
// return forkJoin(batch);
})
);
Try
sync(): Observable<any> {
return from(this.db.getProducts()).pipe(
map(products => {
const batch: Observable<any>[] = [];
for (const product of products) {
batch.push(this.api.updateProduct(product));
}
return batch;
}),
switchMap(batch => forkJoin(batch)),
)
}
This is assuming this.db.getProducts()
is synchronous static data and not an observable/asynchronous data.
Then you can try doing
this.productService.sync().subscribe(() => { console.log('sync done'); });
And see if any API calls are being made.