Search code examples
c#.netreactiverx.net

Act on each DynamicData SourceCache items with constraint and pause support


What I'm trying to do here is to iterate over each item of SourceCache with a little time between each run and do some async work over each item. Each item must be processed one after other, the work can't overlap. I put a little constraint after each run. The source cache data can change in any moment, it is not fixed data like the example. This example is working well, but I need a way to pause the processing using a IObservable<bool> which send true when more work can be processed and false otherwise.

I don't know if this is the best approach to this problem, but maybe there is another way to do the same. Try to stick to reactive code if possible.

As a side note I'm using Net Framework 4.6.2 and DynamicData 7.1.1, and most important I'm really new to programming (just for fun) and Rx.Net.

var data = Observable.Range(0, 20).Select(value => new Work(value));

ISourceCache<Work, int> source = new SourceCache<Work, int>(x => x.Id);

using var populate = source.PopulateFrom(data);

var observ = source.Connect().AsObservableCache();

using var _ = observ.Connect()
    .AutoRefresh(x => x.Status)
    .Filter(x => x.Status == Status.Queue)
    .ToCollection()
    .Select(collection => collection.ToObservable())
    .Switch()
    .Take(1)
    // How to pause just before status change
    .Do(work => work.Status = Status.Running)
    .Select(x => Run(x).ToObservable())
    .Concat()
    .Concat(Observable.Empty<Work>().Delay(TimeSpan.FromMilliseconds(250)))
    .Repeat()
    .Subscribe();

using var ignore = toggle.Connect();
private static async Task<Work> Run(Work work)
{
    // This is fixed but the time needed to run each work is not constant
    await Task.Delay(TimeSpan.FromSeconds(1));
    return work;
}

public enum Status
{
    Queue,
    Running
}

public class Work : AbstractNotifyPropertyChanged
{
    private Status _status;

    public Work(int id)
    {
        Id = id;
        Status = Status.Queue;
    }


    public int Id { get; }

    public Status Status
    {
        get => _status;
        set => SetAndRaise(ref _status, value);
    }

    public override string ToString()
    {
        return $"Id: {Id:000} - Status: {Status}";
    }
}

Solution

  • After some brain crushing sessions I ended with something I can call a solution for my problem.

            var toggle = Observable.Timer(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(5))
                .SwitchState(false)
                .Do(Console.WriteLine)
                .Replay(1);
    
            var firstWork = observ.Connect()
                .AutoRefresh(x => x.Status)
                .Filter(x => x.Status == Status.Queue)
                .ToCollection()
                .Select(collection => collection.FirstOrOptional(x => true))
                .StartWith(Optional<Work>.None);
    
            var getWork = Observable.Timer(TimeSpan.FromMilliseconds(250))
                .WithLatestFrom(firstWork, (_, first) => first);
    
            var selector = toggle.Select(value => value ? 
                    getWork :
                    Observable.Empty<Optional<Work>>())
                .Switch()
                .Take(1)
                .Where(optional => optional.HasValue)
                .Select(optional => optional.Value)
                .Do(Console.WriteLine)
                .Do(work => work.Status = Status.Running)
                .Select(x => Run(x).ToObservable())
                .Concat()
                .Do(Console.WriteLine)
                .Repeat();
    
            using var executing = selector.Subscribe();
    
            using var pauseChange = toggle.Connect();
    

    toggle simulate a pause and unpause every 5 seconds, but in my case will be the status of a toggle button.

    When paused the selector return an empty observable with not real work to do.

    When unpaused the stream gets one and only one Work from collection after an initial due time. Then the background jobs is launched until finished and everything get started again.

    In my real case I start with an empty collection this is why I have to do some magic with the Optional<T> implementation from DynamicData.

    I think this solve my problem but I´m open to another solution.