Search code examples
system.reactiveakavache

Avoiding multiple calls in observable pipeline


I am trying to create a GetAndFetch method that would first return data from the cache, then fetch and return data from a webservice, and finally update the cache.

Such a function exists in akavache already, however, the data that is retrieved or stored by it is like a blob. i.e if I am interested in a rss feed I could only work at the level of whole feed and not individual items. I am interested in creating a version that returns the items as IObservable<Item>. This has the advantage that new Items can be displayed as soon they are returned by service and not wait for all the Itemss.

public IObservable<Item> GetAndFetch(IBlobCache cache, string feedUrl)
{
    // The basic idea is to first get the cached objects
    IObservable<HashSet<Item>> cacheBlobObject = cache.GetObject<HashSet<Item>>(feedUrl);

    // Then call the service   
    IObservable<Item> fetchObs = service.GetItems(feedUrl);

    // Consolidate the cache & the retrieved data and then update cache
    IObservable<Item> updateObs = fetchObs
                                      .ToArray()
                                      .MyFilter() // filter out duplicates between retried data and cache
                                      .SelectMany(arg =>
                                      {
                                          return cache.InsertObject(feedUrl, arg)
                                          .SelectMany(__ => Observable.Empty<Item>());
                                      });

    // Then make sure cache retrieval, fetching and update is done in order
    return cacheBlobObject.SelectMany(x => x.ToObservable())
                .Concat(fetchObs)
                .Concat(upadteObs);
}

The issue with my approach is that Concat(upadteObs) resubscribes to the fetchObs and ends up calling the service.GetItems(feedUrl) again which is wasteful.


Solution

  • You sound like you need the .Publish(share => { ... }) overload.

    Try this:

    public IObservable<Item> GetAndFetch(IBlobCache cache, string feedUrl)
    {
        // The basic idea is to first get the cached objects
        IObservable<HashSet<Item>> cacheBlobObject = cache.GetObject<HashSet<Item>>(feedUrl);
    
        return
            service
                .GetItems(feedUrl)
                .Publish(fetchObs =>
                {
                    // Consolidate the cache & the retrieved data and then update cache
                    IObservable<Item> updateObs =
                        fetchObs
                            .ToArray()
                            .MyFilter() // filter out duplicates between retried data and cache
                            .SelectMany(arg =>
                                cache
                                    .InsertObject(feedUrl, arg)
                                    .SelectMany(__ => Observable.Empty<Item>()));
    
                    // Then make sure cache retrieval, fetching and update is done in order
                    return
                        cacheBlobObject
                            .SelectMany(x => x.ToObservable())
                            .Concat(fetchObs)
                            .Concat(updateObs);
                });
    }
    

    I'm concerned about the Concat calls - they might need to be Merge.

    Also, it seems like your call to service.GetItems is getting all items anyway - how is it avoiding the items already in the cache?


    An alternative implementation based on the comments:

    public IObservable<Item> GetAndFetch(IBlobCache cache, string feedUrl)
    {
        return
        (
            from hs in cache.GetObject<HashSet<Item>>(feedUrl)
            let ids = new HashSet<string>(hs.Select(x => x.Id))
            select
                hs
                    .ToObservable()
                    .Merge(
                        service
                            .GetItems(feedUrl)
                            .Where(x => !ids.Contains(x.Id))
                            .Do(x => cache.InsertObject(feedUrl, new [] { x })))
        ).Merge();
    }