Search code examples
c++system.reactiverxcpp

RxCpp Reactive Extensions in C++


I have a Win32 console app and i have imported the references to Rx. Intellisense allows me to do this....

using namespace System::Reactive;
using namespace System::Reactive::Concurrency;
using namespace System::Reactive::Disposables;
using namespace System::Reactive::Joins;
using namespace System::Reactive::Linq;
using namespace System::Reactive::PlatformServices;
using namespace System::Reactive::Subjects;
using namespace System::Reactive::Threading;
using namespace System::Reactive::Threading;
using namespace System::Data::Linq;
using namespace System::Xml::Linq;

I then have a number of classes available, such as ISubject/Subject and IObserver/Observer. However there is no IObservable. I am a little alarmed at the lack of documentation for Rx with Cpp. Am i missing any obvious resources?

I've tried Channel9, Google, Stackoverflow and Facebook groups. This is the working C# code that i have made, i'd like to get this working with C++. This function merges all of the data from different observane sources and puts it out as a list.

So matrix one appears from source one, matrix two appears from source two. They are matched by id and are pushed forward together as a list.

public static IObservable<IList<TSource>> MergeById<TSource>(this IObservable<TSource> source, Func<IList<TSource>, TSource> mergeFunc, Func<TSource, int> keySelector, int bufferCount)
{
   return Observable.Create<IList<TSource>>(o =>  
   {
     var buffer = new Dictionary<int, IList<TSource>>();
     return source.Subscribe<TSource>(i =>
          { 
             var index = keySelector(i);
             if (buffer.ContainsKey(index))
             {                  
               buffer[index].Add(i);
             }
             else
             {                                     
               buffer.Add(index, new List<TSource>(){i});                
             }
             if (buffer.Count==bufferCount)
             { 
               o.OnNext(mergeFunc(buffer[index]));    
               buffer.Remove(index);                
             }
          });
     });
}

Any help here would be good. Can't find some of the classes i want and other aspects of the syntax are different. Are there any sources or examples that show how things are done in C++. It could probably be inferred from those.

Here is the original post of the issue.

http://social.msdn.microsoft.com/Forums/en-US/58a25f70-a7b8-498b-ad7a-b57f3e1152da/rxcpp?forum=rx

I have attempted to ask here before, but there was no response. Hoping this will be a bit more fruitful, now i have more information on what i am trying to achieve.

Thank you.


Solution

  • The top level namespace is rxcpp, not System. To get observable, you'll need:

    #include "cpprx/rx.hpp"
    using namespace rxcpp;