any help is appreciated - I am somewhat new to RxJs but i think I have some substantial misunderstanding about how Observables are mapped.
There are two observables that I store via NGRX for one Account a "receivedMessages" and "sentMessages". each is simply an array of "MessageModel" which contains a toUsername and fromUsername.
I'm trying to combine these two observables into 1 observable that I can display "MessageThreads" with all the correspondance between these two users by having an array of such an object with a "username: as unique key almost like a HashMap, this looks like: {username: string, thread: MessageModel[]}
I've tried using forkJoins, Concats, Merge but i'm getting a strange error. leading me to think i've probably not understood what ngrx or rxjs is actually doing :(
inbox-container.component.ts
import { Component, OnInit } from '@angular/core';
import { AppState } from 'src/app/store/app.reducers';
import { Store } from '@ngrx/store';
import { MessageModel } from '../../shared/models/message.model';
import { Observable, forkJoin, of, from, pipe } from 'rxjs';
import { map, concat, combineLatest, mergeMap, flatMap, switchMap, mapTo, merge } from 'rxjs/operators';
import { stringify } from '@angular/compiler/src/util';
@Component({
selector: 'app-inbox-container',
templateUrl: './inbox-container.component.html',
styleUrls: ['./inbox-container.component.css']
})
export class InboxContainerComponent implements OnInit {
receivedMessages: Observable<MessageModel[]>;
sentMessages: Observable<MessageModel[]>;
uniqueThreads: Observable<{ username: string, thread: MessageModel[] }[]>;
constructor(private store: Store<AppState>) {
this.receivedMessages = this.store.select('inbox', 'receivedMessages');
this.sentMessages = this.store.select('inbox', 'sentMessages');
}
ngOnInit() {
this.uniqueThreads =
of(
forkJoin(
this.receivedMessages,
this.sentMessages
),
map((messages: MessageModel[]): Observable<{ username: string, thread: MessageModel[] }[]> => {
let mergedThreads: { username: string, thread: MessageModel[] }[] = [];
messages.forEach((message) => {
let activeThreadFrom = mergedThreads.find(any => any.username === message.fromUsername)
if (activeThreadFrom === null) {
mergedThreads.push({ username: message.fromUsername, thread: [message] })
} else {
activeThreadFrom.thread.push(message);
}
mergedThreads.push(activeThreadFrom);
let activeThreadTo = mergedThreads.find(any => any.username === message.toUsername)
if (activeThreadTo === null) {
mergedThreads.push({ username: message.toUsername, thread: [message] })
} else {
activeThreadTo.thread.push(message);
}
mergedThreads.push(activeThreadTo)
})
return of(
mergedThreads
)
}
)
)
}
}
inbox-container.component.html
<div class="col-xs-12">
<div class="col-xs-4">
<div class="list-group">
<a class="list-group-item" routerLinkActive="active" routerLink="compose">
<h4 class="list-group-item-heading">
<div style="margin-top: 10px; margin-right: 10px; padding-right:10px;" class="dropdown" appSearchBarDirective
[text]=searchText.value>
<input class="form-control" type="text" placeholder="Username.." aria-label="Search" #searchText>
<div class="dropdown-menu">
<app-reactive-accounts-search [searchForm]=searchText.value></app-reactive-accounts-search>
</div>
</div>
</h4>
</a>
<div class="list-group">
<div class="list-group-item" *ngFor="let thread of ( uniqueThreads | async)" appHighlightDirective
style="cursor: pointer;">
<div class="list-group-item-heading">
<h4>{{ thread.username }}
<div class="badge pull-right">4</div>
</h4>
</div>
</div>
</div>
</div>
</div>
<div class="col-xs-8">
<div class="panel panel-default">
<router-outlet></router-outlet>
</div>
</div>
<button (click) = "wtf()">asdasd</button>
</div>
I'm getting a error in the console logs:
InboxContainerComponent.html:10 ERROR Error: Cannot find a differ supporting object 'function mapOperation(source) {
if (typeof project !== 'function') {
throw new TypeError('argument is not a function. Are you looking for `mapTo()`?');
}
return source.lift(new MapOperator(project, thisArg));
}' of type 'mapOperation'. NgFor only supports binding to Iterables such as Arrays.
at NgForOf.push../node_modules/@angular/common/fesm5/common.js.NgForOf.ngDoCheck (common.js:3184)
at checkAndUpdateDirectiveInline (core.js:22101)
at checkAndUpdateNodeInline (core.js:23362)
at checkAndUpdateNode (core.js:23324)
at debugCheckAndUpdateNode (core.js:23958)
at debugCheckDirectivesFn (core.js:23918)
at Object.eval [as updateDirectives] (InboxContainerComponent.html:16)
at Object.debugUpdateDirectives [as updateDirectives] (core.js:23910)
at checkAndUpdateView (core.js:23306)
at callViewActi
And also a warning in the IDE:
Type 'Observable<Observable<[MessageModel[], MessageModel[]]> | OperatorFunction<MessageModel[], Observable<{ username: string; thread: MessageModel[]; }[]>>>' is not assignable to type 'Observable<{ username: string; thread: MessageModel[]; }[]>'.
Type 'Observable<[MessageModel[], MessageModel[]]> | OperatorFunction<MessageModel[], Observable<{ username: string; thread: MessageModel[]; }[]>>' is not assignable to type '{ username: string; thread: MessageModel[]; }[]'.
Type 'Observable<[MessageModel[], MessageModel[]]>' is missing the following properties from type '{ username: string; thread: MessageModel[]; }[]': l
Any help would be much appreciated!
-- SOLUTION ---
I just tied into NGRX store state and as the answer said there is no need to create additional observables.
init() {
this.uniqueThreads = this.store.select('inbox').pipe(
map((state: fromInbox.State) => {
return [
state.receivedMessages,
state.sentMessages
]
}),
).pipe(
map(([receivedMessages, sentMessages]) => {
let mergedThreads: {username: string, thread: { message: MessageModel, received: boolean }[]}[] = [];
const messages: MessageModel[] = [...receivedMessages, ...sentMessages];
messages.forEach((message) => {
console.log(message)
let activeThreadFrom = mergedThreads.find(any => any.username === message.fromUsername)
if (activeThreadFrom === undefined) {
mergedThreads.push({ username: message.fromUsername, thread: [{message: message, received: true}] })
} else {
activeThreadFrom.thread = [ ...activeThreadFrom.thread, {message: message, received: true}];
};
let activeThreadTo = mergedThreads.find(any => any.username === message.toUsername)
if (activeThreadTo === undefined) {
mergedThreads.push({ username: message.toUsername, thread: [{message: message, received: false}] })
} else {
activeThreadTo.thread = [ ...activeThreadTo.thread, {message: message, received: false}];
}
})
console.log(mergedThreads)
return mergedThreads;
})
);
}
Hi I don't see any reason to use of
here and you are using forkJoin
and map
wrong inside it, when you combine two or more observables via forkJoin
it already returns an observable so you can use .pipe
on it and do map inside it, see my example
ngOnInit() {
this.uniqueThreads =
forkJoin(
this.receivedMessages,
this.sentMessages
).pipe(
map(([receivedmessages, sentMessages]): Observable<{ username: string, thread: MessageModel[] }[]> => {
let mergedThreads: { username: string, thread: MessageModel[] }[] = [];
const messages: MessageModel[] = [...receivedmessages, ...sentMessages];
messages.forEach((message) => {
let activeThreadFrom = mergedThreads.find(any => any.username === message.fromUsername)
if (activeThreadFrom === null) {
mergedThreads.push({ username: message.fromUsername, thread: [message] })
} else {
activeThreadFrom.thread.push(message);
}
mergedThreads.push(activeThreadFrom);
let activeThreadTo = mergedThreads.find(any => any.username === message.toUsername)
if (activeThreadTo === null) {
mergedThreads.push({ username: message.toUsername, thread: [message] })
} else {
activeThreadTo.thread.push(message);
}
mergedThreads.push(activeThreadTo)
})
return mergedThreads;
})
);
}