Search code examples
c#multithreadingasync-awaitsystem.reactiverx.net

Reactive Extensions OperationCancelled exception on Enumerable.Range / Observable.FromAsync


I have the below code which is pulling data from a REST paginated API.

When using reactive extensions, it gets near to the end of the downloads (i.e. page 1,636 out of the known target 1,653, the exact count it gets to depends on concurrent fetches with higher concurrency resulting in lower page count of the known target)., and then my receive function throws a OperationCancelled exception (however i never set my cancellation token source).

It is like Select is cancelling my function somehow but only near the end of the pagination requests or the observable terminates and then kills off my observable, i think (but new to rx.net).

This is not a rate limit issue either, does this with one download at a time (MaxConcurrentDownloads set to 1).

Any ideas what i am doing wrong please with the below?

using var httpClient = new HttpClient();
var api = new PolygonWebApi(httpClient, this.apiKey);
var list = new List<TickerV2>();
var start = DateTime.Now;

// get first page
var response = await api.GetTickersAsync(BatchSize, 1, this.cts.Token);
list.AddRange(response.Tickers);
var pages = (response.Count + BatchSize - 1) / BatchSize;
                
var query = Enumerable
                .Range(2, pages - 1)
                .ToObservable()
                .Select(page => Observable.FromAsync(() =>
                {
                    return api
                            .GetTickersAsync(BatchSize, page, this.cts.Token)
                            .ContinueWith( x => new TickersResponseWithPage(page, x.Result));
                }))
                .Merge(MaxConcurrentDownloads);

query.Subscribe((response) =>
{
    this.logger.LogInformation($"adding {response.TickersResponse.Tickers.Length} records from page {response.Page}");
    list.AddRange(response.TickersResponse.Tickers);
});
await query.ToTask(this.cts.Token);
                
var duration = DateTime.Now - start;
this.logger.LogInformation($"{nameof(UpdateTickersWorker)} downloaded {list.Count:n0} in {duration.Humanize()}");   

Additional info if required, sequential test prove calls to API are OK and return all 1,653 pages

using var httpClient = new HttpClient();
var api = new PolygonWebApi(httpClient, this.apiKey);
var list = new List<TickerV2>();
var start = DateTime.Now;


// get first page
var response = await api.GetTickersAsync(BatchSize, 1, this.cts.Token);
list.AddRange(response.Tickers);
var pages = (response.Count + BatchSize - 1) / BatchSize;

// read from second page
for (var page = 2; page <= pages && this.cts.Token.IsCancellationRequested == false; page++)
{
    response = await api.GetTickersAsync(BatchSize, page, this.cts.Token);
    list.AddRange(response.Tickers);
    this.logger.LogInformation($"adding {response.Tickers.Length} records from page {page}");
}

var duration = DateTime.Now - start;
this.logger.LogInformation($"{nameof(UpdateTickersWorker)} downloaded {list.Count} in {duration.Humanize()}");

UPDATE

I modified the below to stop the repeating pages and seems to have resolved the repeating issue:

IObservable<IList<TickerV2>> query =
    Observable
        .Using(
            () => new HttpClient(),
            hc =>
                from first_response in Observable.FromAsync(ct => PolygonWebApi.GetTickersAsync(hc, this.apiKey, BatchSize, 1, ct))
                let pages = (first_response.Count + BatchSize - 1) / BatchSize
                from trwp in
                    Observable
                        .Range(2, pages - 1)
                        .Select(page =>
                            Observable
                                .FromAsync(ct => PolygonWebApi.GetTickersAsync(hc, this.apiKey, BatchSize, page, ct))
                                .Select(r => new TickersResponseWithPage(page, r)))
                        .Merge(MaxConcurrentDownloads)
                        .StartWith(new TickersResponseWithPage(1, first_response))
                from tv2 in trwp.TickersResponse.Tickers
                select tv2)
        .ToList();

list = await query.ToTask(this.cts.Token);

Solution

  • You're doing a lot of weird mixing of synchronous code, enumerables, with Rx and with Tasks. All of that makes for a big mess when debugging. You should pick a monad and stay in it the whole time - don't mix them.

    Can you please try this pure Rx version of your code and let me know what kind of results you get? Please append to the end of your question and don't change what it there.

    IObservable<IList<TickerV2>> query =
        Observable
            .Using(
                () => new HttpClient(),
                hc =>
                    from first_response in Observable.FromAsync(ct => api.GetTickersAsync(BatchSize, 1, ct))
                    let pages = (first_response.Count + BatchSize - 1) / BatchSize
                    from trwp in
                        Observable
                            .Range(2, pages - 1)
                            .SelectMany(page =>
                                Observable
                                    .FromAsync(ct => api.GetTickersAsync(BatchSize, page, ct))
                                    .Select(r => new TickersResponseWithPage(page, r)))
                            .StartWith(new TickersResponseWithPage(1, first_response))
                    from tv2 in trwp.TickersResponse.Tickers
                    select tv2)
            .ToList();
    
    IList<TickerV2> list = await query;
    

    Here's how to create the api object in a Defer:

    IObservable<IList<TickerV2>> query =
        Observable
            .Defer(() =>
            {
                var api = new PolygonWebApi(httpClient, this.apiKey);
                return
                    Observable
                        .Using(... as above ...)
                        .ToList();
            });