Search code examples
c#async-awaithttpclientsystem.reactive

How can I use Rx with C# await calls to retrieve JSON data from web


I have a method that calls an API with HttpClient and build a list of Customers as IEnumerable<Customer>. The way this API works is that it will return only 100 customers at a time, and will provide another link in the JSON content to fetch again the next 100.

How can I structure this code to iteratively fetch the records until all are fetched and build a large IEnumerable<Customer> and return from this method. I'm looking for the solution with Rx.

Task<IEnumerable<Customer>> GetCustomers(string url)
{
  HttpClient client = new HttpClient();
  HttpResponseMessage response = await client.GetAsync(url);
  response.EnsureSuccessStatusCode();
  string responseBody = await response.Content.ReadAsStringAsync();
  
  // TODO: Deserialize responseBody and build a new IEnumerable<Customer>
}

Json:

{
    nextRecords: '\customers\123'
    customers: [
        {
            name: 'John Doe'
        },
        {
            name: 'Mary Doe'
        }
        ]
}

Solution

  • I've changed the signature from Task<IEnumerable<Customer>> to IObservable<Customer> to make it more Rx-like.

    You need to define the Func<JObject, IObservable<Customer>> createCustomers function.

    Try having a go at this:

    IObservable<Customer> GetCustomers(string url)
    {
        Func<JObject, IObservable<Customer>> createCustomers = jo => { ... };
        return Observable.Create<Customer>(o =>
        {
            var final_url = url + "\\customers";
            return
                Observable
                    .While(
                        () => final_url != url,
                        Observable
                            .Using(
                                () => new HttpClient(),
                                client =>
                                    from x in Observable.FromAsync(() => client.GetAsync(final_url))
                                    from y in Observable.Start(() =>
                                    {
                                        x.EnsureSuccessStatusCode();
                                        return x;
                                    })
                                    from z in Observable.FromAsync(() => y.Content.ReadAsStringAsync())
                                    from w in Observable.Start(() =>
                                    {
                                        var j = JObject.Parse(z);
                                        final_url = url + j.Property("nextRecords").Value;
                                        return createCustomers(j);
                                    })
                                    from v in w
                                    select v))
                    .Subscribe(o);
        });
    }