Search code examples
c#silverlightcachingenterprise-librarysystem.reactive

Rx Amb extension


I'm working with the Reactive framework for Silverlight and would like to achieve the following.

I am try to create a typical data provider for a Silverlight client that also takes advantage of the caching framework available in MS Ent Lib. The scenarios requires that I must check in the cache for the key-value pair before hitting the WCF data client.

By using the Rx extension Amb, I am able to pull the data from the cache or WCF data client, whichever returns first, but how can I stop the WCF client from executing the call if the values is in the cache?

I would also like to consider racing conditions, e.g. if the first subscriber requests some data and the provider is fetching data from the WCF data client (async), how do I prevent subsequent async requests from doing the same thing (at this stage, the cache has yet to be populated).


Solution

  • I had exactly the same problem. I solved it with an extension method with the following signature:

    IObservable<R> FromCacheOrFetch<T, R>(
        this IObservable<T> source,
        Func<T, R> cache,
        Func<IObservable<T>, IObservable<R>> fetch,
        IScheduler scheduler) where R : class
    

    Effectively what this did was take in the source observable and return an observable that would match each input value with its output value.

    To get each output value it would check the cache first. If the value exists in the cache it used that. If not it would spin up the fetch function only on values that weren't in the cache. If all of the values were in the cache then the fetch function would never be spun up - so no service connection set up penalty, etc.

    I'll give you the code, but it's based on a slightly different version of the extension method that uses a Maybe<T> monad - so you might find you need to fiddle with the implementation.

    Here it is:

        public static IObservable<R> FromCacheOrFetch<T, R>(this IObservable<T> source, Func<T, R> cache, Func<IObservable<T>, IObservable<R>> fetch, IScheduler scheduler)
            where R : class
        {
            return source.FromCacheOrFetch<T, R>(t => cache(t).ToMaybe(null), fetch, scheduler);
        }
    
        public static IObservable<R> FromCacheOrFetch<T, R>(this IObservable<T> source, Func<T, Maybe<R>> cache, Func<IObservable<T>, IObservable<R>> fetch, IScheduler scheduler)
        {
            var results = new Subject<R>();
    
            var disposables = new CompositeDisposable();
    
            var loop = new EventLoopScheduler();
            disposables.Add(loop);
    
            var sourceDone = false;
            var pairsDone = true;
            var exception = (Exception)null;
    
            var fetchIn = new Subject<T>();
            var fetchOut = (IObservable<R>)null;
            var pairs = (IObservable<KeyValuePair<int, R>>)null;
    
            var lookup = new Dictionary<T, int>();
            var list = new List<Maybe<R>>();
            var cursor = 0;
    
            Action checkCleanup = () =>
            {
                if (sourceDone && pairsDone)
                {
                    if (exception == null)
                    {
                        results.OnCompleted();
                    }
                    else
                    {
                        results.OnError(exception);
                    }
                    loop.Schedule(() => disposables.Dispose());
                }
            };
    
            Action dequeue = () =>
            {
                while (cursor != list.Count)
                {
                    var mr = list[cursor];
                    if (mr.HasValue)
                    {
                        results.OnNext(mr.Value);
                        cursor++;
                    }
                    else
                    {
                        break;
                    }
                }
            };
    
            Action<KeyValuePair<int, R>> nextPairs = kvp =>
            {
                list[kvp.Key] = Maybe<R>.Something(kvp.Value);
                dequeue();
            };
    
            Action<Exception> errorPairs = ex =>
            {
                fetchIn.OnCompleted();
                pairsDone = true;
                exception = ex;
                checkCleanup();
            };
    
            Action completedPairs = () =>
            {
                pairsDone = true;
                checkCleanup();
            };
    
            Action<T> sourceNext = t =>
            {
                var mr = cache(t);
                list.Add(mr);
                if (mr.IsNothing)
                {
                    lookup[t] = list.Count - 1;
                    if (fetchOut == null)
                    {
                        pairsDone = false;
                        fetchOut = fetch(fetchIn.ObserveOn(Scheduler.ThreadPool));
                        pairs = fetchIn.Select(x => lookup[x]).Zip(fetchOut, (i, r2) => new KeyValuePair<int, R>(i, r2));
                        disposables.Add(pairs.ObserveOn(loop).Subscribe(nextPairs, errorPairs, completedPairs));
                    }
                    fetchIn.OnNext(t);
                }
                else
                {
                    dequeue();
                }
            };
    
            Action<Exception> errorSource = ex =>
            {
                sourceDone = true;
                exception = ex;
                fetchIn.OnCompleted();
                checkCleanup();
            };
    
            Action completedSource = () =>
            {
                sourceDone = true;
                fetchIn.OnCompleted();
                checkCleanup();
            };
    
            disposables.Add(source.ObserveOn(loop).Subscribe(sourceNext, errorSource, completedSource));
    
            return results.ObserveOn(scheduler);
        }
    

    Example usage would look like this:

    You would have a source of the indices that you want to fetch:

    IObservable<X> source = ...
    

    You would have a function that can get values from the cache and an action that can put them in (and both should be thread-safe):

    Func<X, Y> getFromCache = x => ...;
    Action<X, Y> addToCache = (x, y) => ...;
    

    Then you would have the actual call to go get the data from your database or service:

    Func<X, Y> getFromService = x => ...;
    

    Then you could define fetch like so:

    Func<IObservable<X>, IObservable<Y>> fetch =
        xs => xs.Select(x =>
        {
            var y = getFromService(x);
            addToCache(x, y);
            return y;
        });
    

    And finally you can make your query by calling the following:

    IObservable<Y> results =
        source.FromCacheOrFetch(
            getFromCache,
            fetch,
            Scheduler.ThreadPool);
    

    Of course you would need to subscribe to the result to make the computation take place.