I have an IObservable<T>
sequence where T
is a KeyValuePair<TKey, TValue>
which I group using GroupBy
from System.Reactive.Linq
.
I would like to perform an aggregation operation on each of the IGroupedObservable<TKey, KeyValuePair<TKey, TValue>>
but that aggregation is defined as an Func<IEnumerable<TValue>, TValue>
.
For example, here I want to count the number of times each distinct word appears and print that to the console:
Func<IEnumerable<int>, int> aggregate = x => x.Count();
using (new[] { "one", "fish", "two", "fish" }
.Select(x => new KeyValuePair<string, int>(x, 1))
.ToObservable()
.GroupBy(x => x.Key)
.Select(x => new KeyValuePair<string, IEnumerable<int>>(
x.Key,
x.Select(y => y.Value).ToEnumerable()))
//.SubscribeOn(Scheduler.Default)
.Subscribe(x => Console.WriteLine($"{x.Key} [{aggregate(x.Value)}]")))
{
}
I expect output similar to this (order is unimportant):
one [1]
fish [2]
two [1]
But instead it either blocks (probably a deadlock) or gives no output at all (when I uncomment the SubscribeOn
clause of the LINQ statement).
I have tried to reduce the above code from the actual usage scenario, which attempts to link two TPL Dataflow blocks but encounter similar behaviour:
Func<IEnumerable<int>, int> aggregate = x => x.Sum();
var sourceBlock = new TransformBlock<string, KeyValuePair<string, int>>(x => new KeyValuePair<string, int>(x, 1));
var targetBlock = new ActionBlock<KeyValuePair<string, IEnumerable<int>>>(x => Console.WriteLine($"{x.Key} [{aggregate(x.Value)}]"));
using (sourceBlock.AsObservable()
.GroupBy(x => x.Key)
.Select(x => new KeyValuePair<string, IEnumerable<int>>(x.Key, x.Select(y => y.Value).ToEnumerable()))
.Subscribe(targetBlock.AsObserver()))
{
foreach (var kvp in new[] { "one", "fish", "two", "fish" })
{
sourceBlock.Post(kvp);
}
sourceBlock.Complete();
targetBlock.Completion.Wait();
}
I am aware that there are framework provided Sum
and Count
methods that operate on IObservable<T>
but I am constrained to IEnumerable<T>
aggregation functions.
Have I misunderstood ToEnumerable
and what can I do to fix it?
EDIT:
The constraint of IEnumerable<T>
is introduced by the target of the two dataflow blocks I am trying to link, whose signature is not mine to change.
GroupBy
works like this: when new element arrives, it extracts a key and sees if that key has already been observed before. If not - it creates new group (new observable) and pushes key and that observable to you. Key point is - when you subscribe to GroupBy
and item is pushed to your subscription - sequence is not yet grouped. What is pushed is group key and another observable (IGroupedObservable
) to which elements in this group will be pushed.
What you are doing in your code is essentially subscribing to GroupBy
and then blocking inside GroupBy
subscription trying to enumerate IGroupingObservable
. But you cannot enumerate it at this point, because grouping is not complete. For it to be complete - GroupBy
should process whole sequence, but it cannot, because it's blocked waiting for your subscription handler to complete. And your subscription handler waits for GroupBy
to complete (blocked trying enumerate not yet ready sequence). Hence you have a deadlock.
If you try to introduce ObserveOn(Scheduler.Default)
to run your subscribe handler on thread pool threads - it will not help. It will remove deadlock, but will introduce race condition instead and you will be losing items, because you only subscribe to individual group when starting to enumerate result of ToEnumerable
. At this point it might be too late and some and some items were already pushed to individual group observable before you subscribed to it (by starting to enumerate). Those items are not replayed and so are lost.
What will help it to indeed use Count()
provided for IObservable
, but for some reason you said you cannot do that.
In your case with dataflow blocks, you can try something like this:
sourceBlock.AsObservable()
.GroupBy(x => x.Key)
.Select(x => {
var res = new { x.Key, Value = x.Select(y => y.Value).Replay() };
// subscribe right here
// Replay will ensure that no items are missed
res.Value.Connect();
return res;
})
// observe on thread pool threads to not deadlock if necessary
// in the example with datablock in your question - it is not
//.ObserveOn(Scheduler.Default)
// now no deadlock and no missing items
.Select(x => new KeyValuePair<string, IEnumerable<int>>(x.Key, x.Value.ToEnumerable()))
.Subscribe(targetBlock.AsObserver())