Search code examples
.netrestobservablecollectionsystem.reactivepolling

Http polling until request is fulfilled with Rx


I am requesting a resource over http from a restful api. The resource takes time to reach a 'complete' status and the responding http status code reflects the completeness of the resource.

The calling client is a user waiting to consume the resource who does not want to wait indefinitely. Therefore the api is polled up to a maximum number of attempts to get a 'complete' resource. Should the api return a status indicating the resource is complete, the resource is then returned up the stack for consumption. If the api does not return a status indicating the resource is complete within the maximum number of attempts then the client should stop polling and just return some kind of failure status.

In order to achieve this at the moment I am using Thread.Sleep(500) to block the thread that is waiting for the result. Its causing me nightmares thinking how inefficient this is, so I am looking to improve the technique.

I have looked into Rx and trying in vain to bend the Observable.Interval() to my will but I am having trouble trying to observe on the calling thread. I would ideally like to know what my code should look like! Currently at:

 Observable.Interval(TimeSpan.FromMilliseconds(500))
           .ObserveOn(SynchronizationContext.Current)
           .Take(10)
           .Subscribe(i =>
               {
                  // check if resource is complete
               });

Solution

  • I took the TPL route in the end. I avoided the explicit Thread.Sleep by using Task.Delay, which is one less namespace to reference at least.

    public class HttpRequestRetrierResponseContainer<T>
    {
        public T Value { get; set; }
        public bool IsCriteriaMet { get; set; }
        public int AttemptCount { get; set; }
    }
    
    public class HttpRequestRetrier
    {
        private readonly IHttpClientWrapper _httpClientWrapper;
    
        public HttpRequestRetrier(IHttpClientWrapper httpClientWrapper)
        {
            _httpClientWrapper = httpClientWrapper;
        }
    
        public async Task<HttpRequestRetrierResponseContainer<T>> RepeatRequestUntilResponseMeetsCriteria<T>(Func<HttpRequestMessage> getMessage, Predicate<HttpResponseMessage> criteria, int maxAttempts)
        {
            return await Attempt<T>(getMessage, criteria, maxAttempts, 0);
        }
    
        private async Task<HttpRequestRetrierResponseContainer<T>> Attempt<T>(Func<HttpRequestMessage> getMessage, Predicate<HttpResponseMessage> criteria, int maxAttempts, int attemptCount)
        {
            if (attemptCount < maxAttempts)
            {
                var response = await _httpClientWrapper.SendMessageAsync(getMessage);
    
                if (criteria(response))
                {
                    var value = await response.Extensions().ReadAsAsync<T>();
                    return new HttpRequestRetrierResponseContainer<T> { IsCriteriaMet = true, AttemptCount = attemptCount, Value = value };
                }
    
                Task.Delay(500).Wait();
    
                return await Attempt<T>(getMessage, criteria, maxAttempts, attemptCount + 1);
            }
    
            return new HttpRequestRetrierResponseContainer<T> { IsCriteriaMet = false };
        }
    }