Search code examples
c#reactive-programmingsystem.reactive

How to perform an action after the subscriber on next callback in Rx?


I'd like to be able to add an extension method like this:

IObservable<T> AfterSubscriber(this IObservable<T> observable, Action<T> action)

Where:

var subj = new Subject<int>();
subj.
    AfterSubscriber(i => Console.WriteLine("Second")).
    Subscribe(i => Console.WriteLine("First"));

subj.OnNext(1);

Produces the output:

First
Second

Any suggestions on how to implement the above method? Are there any built in methods I can use? (Assumption: synchronous/immediate subscription)


Solution

  • How about this implementation?

    public static IObservable<T> AfterSubscriber<T>(
        this IObservable<T> @this, Action<T> after)
    {
        if (@this == null) throw new ArgumentNullException("@this");
        if (after == null) throw new ArgumentNullException("after");
        return Observable.Create<T>(o =>
        {
            return @this.Subscribe(t =>
            {
                o.OnNext(t);
                try
                {
                    after(t);
                }
                catch (Exception exception)
                {
                    o.OnError(exception);
                }
            }, o.OnError, o.OnCompleted);
        });
    }