I am trying to create a GetAndFetch
method that would first return data from the cache, then fetch and return data from a webservice, and finally update the cache.
Such a function exists in akavache
already, however, the data that is retrieved or stored by it is like a blob. i.e if I am interested in a rss
feed I could only work at the level of whole feed and not individual items. I am interested in creating a version that returns the items as IObservable<Item>
. This has the advantage that new Item
s can be displayed as soon they are returned by service
and not wait for all the Items
s.
public IObservable<Item> GetAndFetch(IBlobCache cache, string feedUrl)
{
// The basic idea is to first get the cached objects
IObservable<HashSet<Item>> cacheBlobObject = cache.GetObject<HashSet<Item>>(feedUrl);
// Then call the service
IObservable<Item> fetchObs = service.GetItems(feedUrl);
// Consolidate the cache & the retrieved data and then update cache
IObservable<Item> updateObs = fetchObs
.ToArray()
.MyFilter() // filter out duplicates between retried data and cache
.SelectMany(arg =>
{
return cache.InsertObject(feedUrl, arg)
.SelectMany(__ => Observable.Empty<Item>());
});
// Then make sure cache retrieval, fetching and update is done in order
return cacheBlobObject.SelectMany(x => x.ToObservable())
.Concat(fetchObs)
.Concat(upadteObs);
}
The issue with my approach is that Concat(upadteObs)
resubscribes to the fetchObs
and ends up calling the service.GetItems(feedUrl)
again which is wasteful.
You sound like you need the .Publish(share => { ... })
overload.
Try this:
public IObservable<Item> GetAndFetch(IBlobCache cache, string feedUrl)
{
// The basic idea is to first get the cached objects
IObservable<HashSet<Item>> cacheBlobObject = cache.GetObject<HashSet<Item>>(feedUrl);
return
service
.GetItems(feedUrl)
.Publish(fetchObs =>
{
// Consolidate the cache & the retrieved data and then update cache
IObservable<Item> updateObs =
fetchObs
.ToArray()
.MyFilter() // filter out duplicates between retried data and cache
.SelectMany(arg =>
cache
.InsertObject(feedUrl, arg)
.SelectMany(__ => Observable.Empty<Item>()));
// Then make sure cache retrieval, fetching and update is done in order
return
cacheBlobObject
.SelectMany(x => x.ToObservable())
.Concat(fetchObs)
.Concat(updateObs);
});
}
I'm concerned about the Concat
calls - they might need to be Merge
.
Also, it seems like your call to service.GetItems
is getting all items anyway - how is it avoiding the items already in the cache?
An alternative implementation based on the comments:
public IObservable<Item> GetAndFetch(IBlobCache cache, string feedUrl)
{
return
(
from hs in cache.GetObject<HashSet<Item>>(feedUrl)
let ids = new HashSet<string>(hs.Select(x => x.Id))
select
hs
.ToObservable()
.Merge(
service
.GetItems(feedUrl)
.Where(x => !ids.Contains(x.Id))
.Do(x => cache.InsertObject(feedUrl, new [] { x })))
).Merge();
}