I got an observable
of KeyValuePair<int,double>
:
--(7|45.2)--(3|11.1)--(5|13.2)--(6|36.2)--(3|57.4)
I got a list of consumers defined at runtime. They are only interested in values produced for a single key (myKey).
For example:
Here is my consumer subscription code (one per consumer):
myObservable.Where(t => t.Key == myKey).Subscribe(t => /* DoSomething*/);
Let's take:
myObservable
Let's call Comparison the code t.Key == myKey
For every new message being published, Comparison will be executed M times (once per consumer). In the case of N messages, Comparison will be executed N * M
Is RX Extension offering another way to do to avoid executing that many comparisons?
Do I need to make it myself? (using the dictionary of (mykey, consumers) for example, and forwarding the messages to the right consumer(s))
Thanks
So you want to optimize this setup:
IObservable<KeyValuePair<int, double>> myObservable;
myObservable.Where(e => e.Key == 1).Select(e => e.Value).Subscribe(x => { /* DoSomething*/ });
myObservable.Where(e => e.Key == 3).Select(e => e.Value).Subscribe(x => { /* DoSomething*/ });
myObservable.Where(e => e.Key == 7).Select(e => e.Value).Subscribe(x => { /* DoSomething*/ });
This setup will indeed cause multiple comparisons for each element emitted by the myObservable
. A simple way to optimize it is to group the myObservable
with the GroupBy
operator, in order to created a nested observable structure:
IObservable<IGroupedObservable<int, double>> myGroupedObservable = myObservable
.GroupBy(e => e.Key, e => e.Value);
...and then select the group that you want to observe, and Merge
the nested structure to a flat sequence:
myGroupedObservable.Where(g => g.Key == 1).Merge().Subscribe(x => { /* DoSomething*/ });
myGroupedObservable.Where(g => g.Key == 3).Merge().Subscribe(x => { /* DoSomething*/ });
myGroupedObservable.Where(g => g.Key == 7).Merge().Subscribe(x => { /* DoSomething*/ });
This way the comparison will occur only once per key and consumer. You are now comparing IGroupedObservable<int, double>
subsequences, not KeyValuePair<int, double>
elements. These subsequences are emitted by the GroupBy
operator only when it receives the first element that has a distinct key. So the total number of comparisons will be equal to NxM, where N is the number of consumers and M is the number of the distinct keys. Unless you are dealing with millions of consumers, this number should be fairly small, and should not cause noticeable overhead.
A more complex approach that is based on a combination of the GroupBy
operator and a ConcurrentDictionary<TKey, Subject<TSource>>
, can be found in the 5th revision of this answer. It is probably less efficient than the simple approach above, because there are two dictionaries involved (the GroupBy
also uses a dictionary internally), so each key is hashed twice.