Search code examples
c#asynchronoussystem.reactiverx.net

Create observable from periodic async request


I want a generic way to convert an asynchronous method to an observable. In my case, I'm dealing with methods that uses HttpClient to fetch data from an API.

Let's say we have the method Task<string> GetSomeData() that needs to become a single Observable<string> where the values is generated as a combination of:

  • Repeated periodic calls to GetSomeData() (for example every x seconds)
  • Manually triggered calls to GetSomeData() at any given time (for example when user hits refresh).

Since there is two ways to trigger execution of GetSomeData() concurrency can be an issue. To avoid demanding that GetSomeData() is thread-safe, I want to limit the concurrency so that only one thread is executing the method at the same time. As a consequence I need to handle overlapping requests with some strategy. I made a (kind of) marble diagram trying to describe the problem and wanted outcome

marble diagram

My instinct tells me there is a simple way to achieve this, so please give me some insights :)

This is the solution I've got so far. It unfortunately doesn't solve the concurrency problem.

    public class ObservableCreationWrapper<T>
    {
        private Subject<Unit> _manualCallsSubject = new Subject<Unit>();
        private Func<Task<T>> _methodToCall;
        private IObservable<T> _manualCalls;

        public IObservable<T> Stream { get; private set; }

        public ObservableCreationWrapper(Func<Task<T>> methodToCall, TimeSpan period)
        {
            _methodToCall = methodToCall;
            _manualCalls = _manualCallsSubject.AsObservable()
                .Select(x => Observable.FromAsync(x => methodToCall()))
                .Merge(1);

            Stream = Observable.FromAsync(() => _methodToCall())
                .DelayRepeat(period)
                .Merge(_manualCalls);
        }

        public void TriggerAdditionalCall()
        {
            _manualCallsSubject.OnNext(Unit.Default);
        }
    }

Extension method for repeating with delay:

static class Extensions
{
    public static IObservable<T> DelayRepeat<T>(this IObservable<T> source, TimeSpan delay) => source
        .Concat(
            Observable.Create<T>(async observer =>
            {
                await Task.Delay(delay);
                observer.OnCompleted();
            }))
        .Repeat();
}

An example of a service containing the method to generate the observable

class SomeService
{
    private int _ticks = 0;

    public async Task<string> GetSomeValueAsync()
    {
        //Just a hack to dermine if request was triggered manuall or by timer
        var initiatationWay = (new StackTrace()).GetFrame(4).GetMethod().ToString().Contains("System.Threading.CancellationToken") ? "manually" : "by timer";

        //Here we have a data race! We would like to limit access to this method 
        var valueToReturn = $"{_ticks} ({initiatationWay})";

        await Task.Delay(500);
        _ticks += 1; 
        return valueToReturn;
    }
}

Used like this (data race will occur):

static async Task Main(string[] args)
{
    //Running this program will yield non deterministic results due to data-race in GetSomeValueAsync
    var someService = new SomeService();
    var stopwatch = Stopwatch.StartNew();
    var observableWrapper = new ObservableCreationWrapper<string>(someService.GetSomeValueAsync, TimeSpan.FromMilliseconds(2000));
    observableWrapper.Stream
        .Take(6)
        .Subscribe(x => 
            {
                Console.WriteLine($"{stopwatch.ElapsedMilliseconds} | Request: {x} fininshed");
            });

    await Task.Delay(4000);
    observableWrapper.TriggerAdditionalCall();
    observableWrapper.TriggerAdditionalCall();
    Console.ReadLine();
}

Solution

  • Here is my take on this problem:


    Update: I was able to simplify greatly my suggested solution by borrowing ideas from Enigmativity's answer. The Observable.StartAsync method handles the messy business of cancellation automatically¹, and the requirement of non-overlapping execution can be enforced simply by using a SemaphoreSlim.

    /// <summary>
    /// Creates an observable sequence containing the results of an asynchronous
    /// function that is invoked periodically and manually. Overlapping invocations
    /// are prevented. Timer ticks that would cause overlapping are ignored.
    /// Manual invocations cancel previous invocations, and restart the timer.
    /// </summary>
    public static IObservable<T> PeriodicAndManual<T>(
        Func<bool, CancellationToken, Task<T>> functionAsync,
        TimeSpan period,
        out Action manualInvocation)
    {
        // Arguments validation omitted
        var manualSubject = new Subject<bool>();
        manualInvocation = () => manualSubject.OnNext(true);
        return Observable.Defer(() =>
        {
            var semaphore = new SemaphoreSlim(1, 1); // Ensure no overlapping
            return Observable
                .Interval(period)
                .Select(_ => false) // Not manual
                .Merge(manualSubject)
                .TakeUntil(isManual => isManual) // Stop on first manual
                .Repeat() // ... and restart the timer
                .Prepend(false) // Skip the initial interval delay
                .Select(isManual =>
                {
                    if (isManual)
                    {
                        // Triggered manually
                        return Observable.StartAsync(async ct =>
                        {
                            await semaphore.WaitAsync(ct);
                            try { return await functionAsync(isManual, ct); }
                            finally { semaphore.Release(); }
                        });
                    }
                    else if (semaphore.Wait(0))
                    {
                        // Triggered by the timer and semaphore acquired synchronously
                        return Observable
                            .StartAsync(ct => functionAsync(isManual, ct))
                            .Finally(() => semaphore.Release());
                    }
                    return null; // Otherwise ignore the signal
                })
                .Where(op => op != null)
                .Switch(); // Pending operations are unsubscribed and canceled
        });
    }
    

    The out Action manualInvocation argument is the mechanism that triggers a manual invocation.

    Usage example:

    int ticks = 0;
    var subscription = PeriodicAndManual(async (isManual, token) =>
    {
        var id = $"{++ticks} " + (isManual ? "manual" : "periodic");
        Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} Begin {id}");
        await Task.Delay(500, token);
        return id;
    }, TimeSpan.FromMilliseconds(1000), out var manualInvocation)
    .Do(x => Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} Received #{x}"))
    .Subscribe();
    
    await Task.Delay(3200);
    manualInvocation();
    await Task.Delay(200);
    manualInvocation();
    await Task.Delay(3200);
    
    subscription.Dispose();
    

    Output:

    19:52:43.684 Begin 1 periodic
    19:52:44.208 Received #1 periodic
    19:52:44.731 Begin 2 periodic
    19:52:45.235 Received #2 periodic
    19:52:45.729 Begin 3 periodic
    19:52:46.232 Received #3 periodic
    19:52:46.720 Begin 4 periodic
    19:52:46.993 Begin 5 manual
    19:52:47.220 Begin 6 manual
    19:52:47.723 Received #6 manual
    19:52:48.223 Begin 7 periodic
    19:52:48.728 Received #7 periodic
    19:52:49.227 Begin 8 periodic
    19:52:49.730 Received #8 periodic
    19:52:50.226 Begin 9 periodic
    

    The technique of using the Scan and the DistinctUntilChanged operators in order to drop elements while the previous asynchronous operation is running, is borrowed from this question.

    ¹ It seems that the Rx library does not handle this messy business satisfactory though, since it just omits disposing of the CancellationTokenSources it creates.