Search code examples
c#.net-3.5system.reactive

Rx extensions: Where is Parallel.ForEach?


I have a piece of code which is using Parallel.ForEach, probably based on a old version of Rx extensions or the Tasks Parallel Library. I installed a current version of Rx extensions but cannot find Parallel.ForEach. I'm not using any other fancy stuff of the library and just want to process some data in parallel like this:

Parallel.ForEach(records, ProcessRecord);

I found this question, but I would not like to depend on an old versions of Rx. But I was not able to find something similar for Rx, so what's the current and most straight forward way to do that using a current Rx version? The project is using .NET 3.5.


Solution

  • No need to do all this silly goosery if you have Rx:

    records.ToObservable()
        .SelectMany(x => Observable.Start(() => ProcessRecord(x), Scheduler.ThreadPoolScheduler))
        .ToList()
        .First();
    

    (Or, if you want the order of the items maintained at the cost of efficiency):

    records.ToObservable()
        .Select(x => Observable.Start(() => ProcessRecord(x), Scheduler.ThreadPoolScheduler))
        .Concat()
        .ToList()
        .First();
    

    Or if you want to limit how many items at the same time:

    records.ToObservable()
        .Select(x => Observable.Defer(() => Observable.Start(() => ProcessRecord(x), Scheduler.ThreadPoolScheduler)))
        .Merge(5 /* at a time */)
        .ToList()
        .First();