Search code examples
c#system.reactivezero-copy

Reactive Extension, wrap third party API event - zero copy?


I have a third party API that I do not have the source to. I instantiate a callback to an event like this:

using namespace API; // This is where APIClient lives

namespace TestApiClientUI
{
    public partial class Form1 : Form
    {
        APIClient apiClient = new APICLient();
        apiClient.QuoteUpdated += api_ClientUpdated;

        private void api_ClientUpdated(object sender, string s, double b, double a)
        {
        }
    }
}

How do I wrap this into an Rx Observable.FromEvent?

Also, is there a way to do it so the overhead of the wraps copy are as little (zero-copy) as possible?


Solution

  • Let's define a struct to store the event args.

    struct QuoteUpdate
    {
        public string S { get; private set; }
        public double B { get; private set; }
        public double A { get; private set; }
        public QuoteUpdate(string s, double b, double a) : this()
        {
            S = s;
            B = b;
            A = a;
        }
    }
    

    Now we can define an IObservable<QuoteUpdate> as follows.

    var quoteUpdates = Observable.FromEvent<ApiQuoteHandler, QuoteUpdate>(
        emit => (_, s, b, a) => emit(new QuoteUpdate(s, b, a)),
        handler => apiClient.QuoteUpdated += handler,
        handler => apiClient.QuoteUpdated -= handler);
    

    The first lambda defines a mapping from Action<QuoteUpdate> to ApiQuoteHandler. The action, called emit, is what actually broadcasts a value to the subscribers of the observable we are defining. Think of calling emit(value); as something like foreach (var subscriber in subscribers) { subscriber.OnNext(value); }.

    The reason we need this first mapping is because the actual underlying event only knows how to subscribe and unsubscribe ApiQuoteHandler instances. The second and third lambdas are of type Action<ApiQuoteHandler. They are called on subscription and unsubscription, respectively.

    When an IObserver<QuoteUpdate> observer subscribes to the IObservable<QuoteUpdate> we have defined, this is what happens:

    1. observer.OnNext (of type Action<QuoteUpdate>) is passed into our first lambda as the variable emit. This creates an ApiQuoteHandler that wraps the observer.
    2. The ApiQuoteHandler is then passed into our second lambda as the variable handler. The handler subscribes to the QuoteUpdated event on your apiClient.
    3. The IDisposable returned from Subscribe method on our IObservable<QuoteUpdate> contains a reference to the ApiQuoteHandler that was created by the first of our three lambdas.
    4. When at a later time this IDisposable is disposed, the third lambda is called with the same ApiQuoteHandler from before, unsubscribing from the underlying event.

    Because you need a single IObservable<T> for some T yet you have event args (s, b, a), you must define some struct or class to store the three values. I would not worry about the cost of copying a string reference and two double values.