Search code examples
c#.netobservablesystem.reactiverx.net

Rx.net implement retry functionality on disconnect/error in observable


Below is the following code:

    public class FooService
    {
        private ITransportService _transportService;
        public FooService(ITransportService transportService)
        {
            _transportService = transportService;
            _transportService.Connect();
        }

        public IDisposable Subscribe(IObserver<FooData> observer)
        {
            return _transportService.GetObservable()
                .Subscribe(observer);
        }
    }

    public interface ITransportService
    {
        ConnectionState State { get; }
        bool Connect();
        IObservable<FooData> GetObservable();
    }

    public class ClientConsumingProgram
    {
        class FooObserver : IObserver<FooData>
        {
            public void OnNext(FooData value)
            {
                //Client consuming without interruption
            }
            //.. on error.. onCompleted
        }
        public static void Main()
        {
            var fooService = new FooService(transportService);
            var fooObserver = new FooObserver();
            var disposable = fooService.Subscribe(fooObserver);
        }
    }

I want to implement following:

When transport service is disconnected (socket closed from server), I want application to retry for few times, but foo service first needs to call Connect on _transportService and then once State is connected, call GetObservable.

Desired result is OnNext on FooObserver keeps on ticking on client side, if _transportService is connect again before max retry, and once it's exceeds max error OnError should be fired.

Can someone point me to the right direction for implementing this?


UPDATE

public class FooService
{
    private ITransportService _transportService;
    public FooService(ITransportService transportService)
    {
        _transportService = transportService;
        _transportService.Connect();
    }

    public IDisposable Subscribe(IObserver<FooData> observer)
    {
        return _transportService.GetConnectionStateObservable()
        .Select(cs => cs == ConnectionState.Open)
        .DistinctUntilChanged()
        .Select(isOpen => isOpen
            ? _transportService.GetObservable()   //if open, return observable
            : Observable.Start(() => _transportService.Connect()) //if not open, call connect and wait for connection to open
                .IgnoreElements()
                .Select(_ => default(FooData))
                .Concat(Observable.Never<FooData>())
        )
        .Switch()
        .Subscribe(observer);
    }
}

public interface ITransportService
{
    IObservable<ConnectionState> GetConnectionStateObservable();
    bool Connect();
    IObservable<FooData> GetObservable();
}

public class FooData
{
    public int Id { get; set; }
    public string Msg { get; set; }
}

public enum ConnectionState
{
    Open,
    Close
}

public class FooMockTransportService : ITransportService
{
    public ConnectionState State { get; set; }
    private BehaviorSubject<ConnectionState> _connectionSubject = new BehaviorSubject<ConnectionState>(ConnectionState.Close);
    private bool _shouldDisconnect;

    public FooMockTransportService()
    {
        _shouldDisconnect = true;
    }

    public bool Connect()
    {
        State = ConnectionState.Open;
        _connectionSubject.OnNext(ConnectionState.Open);
        return true;
    }

    public IObservable<ConnectionState> GetConnectionStateObservable()
    {
        return _connectionSubject.AsObservable();
    }

    public IObservable<FooData> GetObservable()
    {
        return Observable.Create<FooData>(
            o=>
            {
                TaskPoolScheduler.Default.Schedule(() =>
                {
                    o.OnNext(new FooData { Id = 1, Msg = "First" });
                    o.OnNext(new FooData { Id = 2, Msg = "Sec" });

                    //Simulate disconnection, ony once
                    if(_shouldDisconnect)
                    {
                        _shouldDisconnect = false;
                        State = ConnectionState.Close;
                        o.OnError(new Exception("Disconnected"));
                        _connectionSubject.OnNext(ConnectionState.Close);
                    }

                    o.OnNext(new FooData { Id = 3, Msg = "Third" });
                    o.OnNext(new FooData { Id = 4, Msg = "Fourth" });
                });
                return () => { };
            });
    }
}

public class Program
{
    class FooObserver : IObserver<FooData>
    {
        public void OnCompleted()
        {
            throw new NotImplementedException();
        }

        public void OnError(Exception error)
        {
            Console.WriteLine(error);
        }

        public void OnNext(FooData value)
        {
            Console.WriteLine(value.Id);
        }
    }
    public static void Main()
    {
        var transportService = new FooMockTransportService();
        var fooService = new FooService(transportService);
        var fooObserver = new FooObserver();
        var disposable = fooService.Subscribe(fooObserver);
        Console.Read();
    }
}

Code is compliable and also contains suggestions for Shlomo. Current output:

1
2
System.Exception: Disconnected

Desired output, on disconnect it should catch and retry every 1 sec as an example to see whether it's connected or not:

1
2
1
2
3
4

Solution

  • If you control ITransportService, I would recommend adding a property:

    public interface ITransportService
    {
        ConnectionState State { get; }
        bool Connect();
        IObservable<FooData> GetObservable();
        IObservable<ConnectionState> GetConnectionStateObservable();
    }
    

    Once you can get the states in an observable fashion, producing the observable becomes easier:

    public class FooService
    {
        private ITransportService _transportService;
        public FooService(ITransportService transportService)
        {
            _transportService = transportService;
            _transportService.Connect();
        }
    
        public IDisposable Subscribe(IObserver<FooData> observer)
        {
            return _transportService.GetConnectionStateObservable()
                .Select(cs => cs == ConnectionState.Open)
                .DistinctUntilChanged()
                .Select(isOpen => isOpen 
                    ? _transportService.GetObservable()   //if open, return observable
                    : Observable.Start(() => _transportService.Connect()) //if not open, call connect and wait for connection to open
                        .IgnoreElements()
                        .Select(_ => default(FooData))
                        .Concat(Observable.Never<FooData>())
                )
                .Switch()
                .Subscribe(observer);
        }
    }
    

    If you don't control ITransportService, I would recommend creating an interface that inherits from it where you can add a similar property.

    As an aside, I would recommend you ditch FooObserver, you almost never need to fashion your own observer. Expose the observable, and calling a Subscribe overload on the Observable normally does the trick.

    I can't test any of this though: there's no clarity as to what the retry logic should be like, what the return value for Connect means, or what the ConnectionState class is, and the code doesn't compile. You should try to fashion your question as a mcve.


    UPDATE:

    The following handles the test code as expected:

    public IDisposable Subscribe(IObserver<FooData> observer)
    {
        return _transportService.GetConnectionStateObservable()
            .Select(cs => cs == ConnectionState.Open)
            .DistinctUntilChanged()
            .Select(isOpen => isOpen
                ? _transportService.GetObservable()   //if open, return observable
                    .Catch(Observable.Never<FooData>())
                : Observable.Start(() => _transportService.Connect()) //if not open, call connect and wait for connection to open
                    .IgnoreElements()
                    .Select(_ => default(FooData))
                    .Concat(Observable.Never<FooData>())
            )
            .Switch()
            .Subscribe(observer);
    }
    

    Only change from the original posted code is the additional .Catch(Observable.Never<FooData>()). As written, this code will run forever. I hope you have some way to terminate the observable external to what's posted.