With ReactiveX in Python, how can I summarize a stream of Observables?
I have a stream of dictionaries that are {"user": "...", "date": ...}. I want to make a function I can apply that accumulate the dictionary with the latest date for each user, then emit the accumulated observables when end of stream is hit (it's like max, but has to look at the user field, and will emit multiple values).
Example - input stream:
{ "user": "a", "date": "2017-02-14" }
{ "user": "b", "date": "2016-01-01" }
{ "user": "c", "date": "2015-01-01" }
{ "user": "a", "date": "2017-01-01" }
{ "user": "b", "date": "2017-01-01" }
Expected output (order would not matter)
{ "user": "a", "date": "2017-02-14" }
{ "user": "c", "date": "2015-01-01" }
{ "user": "b", "date": "2017-01-01" }
I read "Filtering Observables", "Transforming Observables", "Combining Observables", and "Decision Tree of Observable Operators" at https://ninmesara.github.io/RxPY/api/operators/index.html, and looked at reduce/aggregate (only emits single value at end), and flat_map (don't know how to detect end of stream). many_select and window (especially) look promising, but I've having a hard time understanding them.
How can I do this with rx (either by using one of the existing operators, or by making a custom operator [which I don't know how to do yet]?)
I think the following might do what you want.
import rx
rx.Observable.from_([
{ "user": "a", "date": "2017-02-14" },
{ "user": "b", "date": "2016-01-01" },
{ "user": "c", "date": "2015-01-01" },
{ "user": "a", "date": "2017-01-01" },
{ "user": "b", "date": "2017-01-01" }]) \
.group_by(lambda x: x['user']) \
.flat_map(lambda x: x.max_by(lambda y: y['date'], lambda a,b: -1 if a < b else 1 if a>b else 0)) \
.subscribe(print)