I have a problem working in Angular, and I'm looking for a solution using Observables, as opposed to what I have now, working, but with Promises (async / await).
The endpoint accepts sorting and pagination parameters, i.e. pageSize and page, to retrieve the data. If you don't send any parameters, it responds with the maximum allowed number of items (currently that's 1000). But the total number of items can be in the tens of thousands, or as low as a few. API response always looks like this:
{
items: [...], // (array of objects)
totalNumberOfItems: 123 // (total number of database entries)
}
Currently, what I have is something like this (I'm omitting try-catches and unnecessary stuff), with Promises:
async getAllItems(): Promise<any[]> {
let page: number = 1;
let pageSize: number;
let items: any[] = [];
let totalNumberOfItems: number;
// Initial httpClient call that returns an Observable by default
let initialResponse = await this.someService.getItems().toPromise();
items = [...items, ...initialResponse.items];
pageSize = initialResponse.items.length;
totalNumberOfItems = initialResponse.totalNumberOfItems;
// Loop until we get them all
while (initialResponse.items.length < initialResponse.totalNumberOfItems) {
page++;
let nextResponse = await this.someService.getItems({ page, pageSize }).toPromise();
items = [...items, ...nextResponse.items];
}
return items;
}
(Or something along those lines, I'm typing from memory.)
Essentially, I can't figure out how to pipe properly (heh) from the initial API call. Something like this as a thought:
interface IResponse {
items: any[];
totalNumberOfItems: number;
}
getAllItems(): Observable<IResponse> {
let page: number = 1;
let pageSize: number;
let totalNumberOfItems: number;
return this.someService.getItems().pipe(
tap(res => {
pageSize = res.items.length;
totalNumberOfItems = res.totalNumberOfItems;
}),
// TODO: Now what?
// Other call using the same getItems() but with parameters, making it iterative
// and adding it to the result of the initial request?
);
}
EDIT: Based on the answer by thisdotutkarsh, I've done this, and it works as intended (it could probably be more refined, but still works):
getAllItems(): Observable<ItemsResponse> {
let page: number = 1;
let pageSize: number;
let totalNumberOfItems: number;
return this.service.getItems().pipe(
tap(response => {
pageSize = response.items.length;
totalNumberOfItems = response.totalNumberOfItems;
}),
expand(response => {
if (response.items.length < totalNumberOfItems) {
const numRepeats = Math.ceil(totalNumberOfItems / pageSize);
return (page < numRepeats) ? this.service.getItems({ page++, pageSize }) : EMPTY;
} else {
return EMPTY;
}
}),
reduce((acc, response) => {
acc.Items = [...acc.Items, ...response.Items];
return acc;
})
);
}
You can achieve the expected behavior using the expand
and reduce
RxJS operators.
The expand
operator recursively projects each source value to an Observable which is then merged in the output Observable. Whereas the reduce
operator applies an accumulator function over the source Observable, and returns the accumulated result when the source completes.
getAllItems() {
let page: number = 1;
let pageSize: number;
let totalNumberOfItems: number;
return this.service.getItems().pipe(
tap(response => {
totalNumberOfItems = response.totalNumberOfItems; /* The tap operator performs side-effect once per subscribe */
}),
expand((response) => {
pageSize += response.items.length ? response.items.length : 0;
return pageSize <= totalNumberOfItems ? this.service.getItems(page++, pageSize) : Observable.empty();
}),
reduce((acc, response) => {
return acc.concat(response.items);
}, [])
.catch(error => console.log(error))
.subscribe((iResponse)) => {
...
});
});
}