How to group an Observable, and from each GroupedObservable keep in memory only the last emitted item? So that each group would behave just like BehaviorSubject.
Something like this:
{user: 1, msg: "Anyone here?"}
{user: 2, msg: "Hi"}
{user: 2, msg: "How are you?"}
{user: 1, msg: "Hello"}
{user: 1, msg: "Good"}
So in memory we'd have only have the last item for each user
:
{user: 2, msg: "How are you?"}
{user: 1, msg: "Good"}
And when a subscriber subscribes, these two items were issued right away (each in it's own emission). Like we had BehaviorSubject for each user
.
onCompleted() is never expected to fire, as people may chat forever.
I don't know in advance what user
values there can be.
I assume your chatlog observable is hot. The groupObservables emitted by #groupBy will consequently also be hot and won't keep anything in memory by themselves.
To get the behavior you want (discard everything but the last value from before subscription and continue from there) you could use a ReplaySubject(1).
Please correct me if I'm wrong
see jsbin
var groups = chatlog
.groupBy(message => message.user)
.map(groupObservable => {
var subject = new Rx.ReplaySubject(1);
groupObservable.subscribe(value => subject.onNext(value));
return subject;
});