I have the below code which is pulling data from a REST paginated API.
When using reactive extensions, it gets near to the end of the downloads (i.e. page 1,636 out of the known target 1,653, the exact count it gets to depends on concurrent fetches with higher concurrency resulting in lower page count of the known target)., and then my receive function throws a OperationCancelled
exception (however i never set my cancellation token source).
It is like Select
is cancelling my function somehow but only near the end of the pagination requests or the observable terminates and then kills off my observable, i think (but new to rx.net).
This is not a rate limit issue either, does this with one download at a time (MaxConcurrentDownloads
set to 1).
Any ideas what i am doing wrong please with the below?
using var httpClient = new HttpClient();
var api = new PolygonWebApi(httpClient, this.apiKey);
var list = new List<TickerV2>();
var start = DateTime.Now;
// get first page
var response = await api.GetTickersAsync(BatchSize, 1, this.cts.Token);
list.AddRange(response.Tickers);
var pages = (response.Count + BatchSize - 1) / BatchSize;
var query = Enumerable
.Range(2, pages - 1)
.ToObservable()
.Select(page => Observable.FromAsync(() =>
{
return api
.GetTickersAsync(BatchSize, page, this.cts.Token)
.ContinueWith( x => new TickersResponseWithPage(page, x.Result));
}))
.Merge(MaxConcurrentDownloads);
query.Subscribe((response) =>
{
this.logger.LogInformation($"adding {response.TickersResponse.Tickers.Length} records from page {response.Page}");
list.AddRange(response.TickersResponse.Tickers);
});
await query.ToTask(this.cts.Token);
var duration = DateTime.Now - start;
this.logger.LogInformation($"{nameof(UpdateTickersWorker)} downloaded {list.Count:n0} in {duration.Humanize()}");
Additional info if required, sequential test prove calls to API are OK and return all 1,653 pages
using var httpClient = new HttpClient();
var api = new PolygonWebApi(httpClient, this.apiKey);
var list = new List<TickerV2>();
var start = DateTime.Now;
// get first page
var response = await api.GetTickersAsync(BatchSize, 1, this.cts.Token);
list.AddRange(response.Tickers);
var pages = (response.Count + BatchSize - 1) / BatchSize;
// read from second page
for (var page = 2; page <= pages && this.cts.Token.IsCancellationRequested == false; page++)
{
response = await api.GetTickersAsync(BatchSize, page, this.cts.Token);
list.AddRange(response.Tickers);
this.logger.LogInformation($"adding {response.Tickers.Length} records from page {page}");
}
var duration = DateTime.Now - start;
this.logger.LogInformation($"{nameof(UpdateTickersWorker)} downloaded {list.Count} in {duration.Humanize()}");
UPDATE
I modified the below to stop the repeating pages and seems to have resolved the repeating issue:
IObservable<IList<TickerV2>> query =
Observable
.Using(
() => new HttpClient(),
hc =>
from first_response in Observable.FromAsync(ct => PolygonWebApi.GetTickersAsync(hc, this.apiKey, BatchSize, 1, ct))
let pages = (first_response.Count + BatchSize - 1) / BatchSize
from trwp in
Observable
.Range(2, pages - 1)
.Select(page =>
Observable
.FromAsync(ct => PolygonWebApi.GetTickersAsync(hc, this.apiKey, BatchSize, page, ct))
.Select(r => new TickersResponseWithPage(page, r)))
.Merge(MaxConcurrentDownloads)
.StartWith(new TickersResponseWithPage(1, first_response))
from tv2 in trwp.TickersResponse.Tickers
select tv2)
.ToList();
list = await query.ToTask(this.cts.Token);
You're doing a lot of weird mixing of synchronous code, enumerables, with Rx and with Tasks. All of that makes for a big mess when debugging. You should pick a monad and stay in it the whole time - don't mix them.
Can you please try this pure Rx version of your code and let me know what kind of results you get? Please append to the end of your question and don't change what it there.
IObservable<IList<TickerV2>> query =
Observable
.Using(
() => new HttpClient(),
hc =>
from first_response in Observable.FromAsync(ct => api.GetTickersAsync(BatchSize, 1, ct))
let pages = (first_response.Count + BatchSize - 1) / BatchSize
from trwp in
Observable
.Range(2, pages - 1)
.SelectMany(page =>
Observable
.FromAsync(ct => api.GetTickersAsync(BatchSize, page, ct))
.Select(r => new TickersResponseWithPage(page, r)))
.StartWith(new TickersResponseWithPage(1, first_response))
from tv2 in trwp.TickersResponse.Tickers
select tv2)
.ToList();
IList<TickerV2> list = await query;
Here's how to create the api
object in a Defer
:
IObservable<IList<TickerV2>> query =
Observable
.Defer(() =>
{
var api = new PolygonWebApi(httpClient, this.apiKey);
return
Observable
.Using(... as above ...)
.ToList();
});