Search code examples
c#system.reactive

RX - Multi consumers by key subscription complexity


I got an observable of KeyValuePair<int,double>:

--(7|45.2)--(3|11.1)--(5|13.2)--(6|36.2)--(3|57.4)

I got a list of consumers defined at runtime. They are only interested in values produced for a single key (myKey).

For example:

  • the consumer 7, is only interested in the value 45.2.
  • the consumer 3, is only interested in the values 11.1 and 57.4
  • the consumer 1, is only interested in values with myKey = 1, so none here.

Here is my consumer subscription code (one per consumer):

myObservable.Where(t => t.Key == myKey).Subscribe(t => /* DoSomething*/);

Let's take:

  • N = number of messages being produced by myObservable
  • M = number of consumers

Let's call Comparison the code t.Key == myKey

For every new message being published, Comparison will be executed M times (once per consumer). In the case of N messages, Comparison will be executed N * M

Is RX Extension offering another way to do to avoid executing that many comparisons?

Do I need to make it myself? (using the dictionary of (mykey, consumers) for example, and forwarding the messages to the right consumer(s))

Thanks


Solution

  • So you want to optimize this setup:

    IObservable<KeyValuePair<int, double>> myObservable;
    
    myObservable.Where(e => e.Key == 1).Select(e => e.Value).Subscribe(x => { /* DoSomething*/ });
    myObservable.Where(e => e.Key == 3).Select(e => e.Value).Subscribe(x => { /* DoSomething*/ });
    myObservable.Where(e => e.Key == 7).Select(e => e.Value).Subscribe(x => { /* DoSomething*/ });
    

    This setup will indeed cause multiple comparisons for each element emitted by the myObservable. A simple way to optimize it is to group the myObservable with the GroupBy operator, in order to created a nested observable structure:

    IObservable<IGroupedObservable<int, double>> myGroupedObservable = myObservable
        .GroupBy(e => e.Key, e => e.Value);
    

    ...and then select the group that you want to observe, and Merge the nested structure to a flat sequence:

    myGroupedObservable.Where(g => g.Key == 1).Merge().Subscribe(x => { /* DoSomething*/ });
    myGroupedObservable.Where(g => g.Key == 3).Merge().Subscribe(x => { /* DoSomething*/ });
    myGroupedObservable.Where(g => g.Key == 7).Merge().Subscribe(x => { /* DoSomething*/ });
    

    This way the comparison will occur only once per key and consumer. You are now comparing IGroupedObservable<int, double> subsequences, not KeyValuePair<int, double> elements. These subsequences are emitted by the GroupBy operator only when it receives the first element that has a distinct key. So the total number of comparisons will be equal to NxM, where N is the number of consumers and M is the number of the distinct keys. Unless you are dealing with millions of consumers, this number should be fairly small, and should not cause noticeable overhead.

    A more complex approach that is based on a combination of the GroupBy operator and a ConcurrentDictionary<TKey, Subject<TSource>>, can be found in the 5th revision of this answer. It is probably less efficient than the simple approach above, because there are two dictionaries involved (the GroupBy also uses a dictionary internally), so each key is hashed twice.