I am having trouble getting the sum (or any reduction) of an inner number property of an Observable within another Observable.
I have an Observable array of "Account" objects (Observable<AppAccount[]>
export interface AppAccount {
_id?: string;
name: string;
And an Observable array of "Balance" objects, each with an accountId. many balances can be associated with an account (sorted/filtered by date, but that piece is removed for brevity)
export interface AccountBalance {
_id?: string;
accountId: string;
amount: number;
I have a helper method which returns just the last Balance object's Amount for a given Account.
getLastAmount(account: AppAccount): Observable<number> {
return this.balanceService.balances$.pipe(
map(balances => {
let last = balances.filter(balance => {
return balance.accountId === account._id;
return last ? last.amount : 0;
tap(amount => console.log(`getLastAmount() => ${amount}`)),
Now I am trying to write a method which will loop through the Accounts, call getLastAmount() for each, and then sum them all and return an Observable. This is what I have managed so far:
getTotalBalance(accounts$: Observable<AppAccount[]>): Observable<number> {
return accounts$.pipe(
map(accounts => from(accounts)),
mergeMap(account => this.getLastAmount(account)),
reduce((sum, current) => {
console.log(`${sum} + ${current}`);
return sum + current;
}, 0)
But this seems to never return, and gets stuck in an infinite loop??
With just one account and one balance associated, with the balance having an 'amount' of '10', I get this from my console log: '0 + 10' over and over, and the network log also confirm it is calling getBalances() continuously.
Am I on the right track? Is there a better way? Why does this RXJS pipe get stuck in a loop?
EDIT: I've made some changes based on picci's suggestions:
getTotalBalance(accounts$: Observable<AppAccount[]>): Observable<number> {
return accounts$.pipe(
map(accounts => accounts.map(account => this.getLastAmount(account))),
concatMap(balances$ => { console.log('balances$', balances$); return forkJoin(balances$); }),
tap(balances => console.log('balances', balances)),
map(balances => balances.reduce(
(amountSum, amount) => {
console.log(`${amountSum} + ${amount}`)
amountSum = amountSum + amount;
return amountSum
}, 0))
But this is still not returning, or the pipe is not completing? I've made a stackblitz here: https://stackblitz.com/edit/angular-rxjs-nested-obsv If you check the console output, it seems to not get any further than the forkJoin call...
If I understand right, you could proceed like this
// somehow you start with the observable which returns the array of accounts
const accounts$: Observable<AppAccount[]> = getAccounts$()
// you also set the date you are interested in
const myDate: Moment = getDate()
// now you build the Observable<number> which will emit the sum of the last balance amounts
const amountSum$: Observable<number> = accounts$.pipe(
// you transform an array of accounts in an array of Observable<number> representing the last Balance amount
map((accounts: Account[]) => {
// use the getLastAmount function you have coded
return accounts.map(account => getLastAmount(account, myDate))
// now we trigger the execution of the Observable in parallel using concatMap, which basically mean wait for the source Observable to complete
// and forkJoin which actually executes the Observables in parallel
concatMap(accounts$ => forkJoin(accounts$)),
// now that we have an array of balances, we reduce them to the sum using the Array reduce method
map(balances => balances.reduce(
(amountSum, amount) => {
amountSum = amountSum + amount;
return amountSum
}, 0)
// eventually you subscribe to the amountSum$ Observable to get the result
next: amountSum => console.log(`The sum of the last balances is: ${amountSum}`),
error: console.err,
complete: () => console.log("I am done")
There may be other combinations that bring to the same result, but this seems to work and can be checked in this stackblitz.
