Search code examples
c#asynchronousasync-awaitmanualreseteventtaskcompletionsource

Implement a capped and buffered job executor


I want to implement a capped and buffered job executor.

It will have a single method:

public class CappedBufferedExecutor {
  public CappedBufferedExecutor(int bufferCapping, int fillTimeInMillisec);
  public Task<bool> EnqueueAsync(string val);
}

The idea is that values are asynchronously enqueued, and once fillTimeInMillisec milliseconds pass, or the buffer is filled to its cap of unique values, the execution is made in practice and the async tasks all complete. While the execution is done (which could take a long time), the buffer can be re-filled and new async executions can be made.

I thought of something in the lines of the following pseudo code

  • Using a Timer, wait for the fillTime to pass, once elapsed, create a new task, that will do the work (see below).
  • On new value, lock a rwlock for read. Check if buffer is full, if so wait on a ManualResetEvent or a TaskCompletionSource.
  • Add new value to buffer (HashSet<string>).
  • If buffer is full, create a new execution task, that will lock the rwlock for write, do the work on all collected values and wake up all pending tasks using a TaskCompletionSource.
  • Wait on the TaskCompletionSource for the buffered task (mentioned in previous step) to be executed.

My problems: how to synchronize the Timer and the filled buffer check, how to wait when buffer is full, how to switch between TaskCompletionSource instances when starting to execute and allowing new values to arrive.


Solution

  • This is just concept, so don't expect much :-)

    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Threading;
    using System.Threading.Tasks;
    
    namespace ConsoleApp
    {
        class Program
        {
            static void Main (string[] args)
            {
                var buffer = CreateBuffer (); 
    
                var executor = new Executor<string> (SomeWork, buffer);
                executor.ProcessingStarted += Executor_ProcessingStarted;
    
                string userInput = null;
    
                do
                {
                    userInput = Console.ReadLine ();
    
                    buffer.Enqueue (userInput);
                }
                while (!string.IsNullOrWhiteSpace (userInput));
    
                executor.Dispose ();
            }
    
            //----------------------------------------------------------------------------------------------------------------------------------
    
            private static IBuffer<string> CreateBuffer ()
            {
                var buffer = new UniqueItemsBuffer<string> (3);
    
                buffer.DataAvailable += (items) => Console.WriteLine ("BUFFER :: data available raised.");
    
                var alert = new Alert ();
    
                var bufferWithTimeout = new BufferWithTimeout<string> (buffer, alert, TimeSpan.FromSeconds (5));
    
                return bufferWithTimeout;
            }
    
            //----------------------------------------------------------------------------------------------------------------------------------
    
            static Random rnd = new Random (); // must be outside, to avoid creating Random too quick because it will use the same seed for all tasks
    
            public static bool SomeWork (string x)
            {
                int delay = rnd.Next (1000, 8000);
    
                Console.WriteLine ($"  +++ Starting SomeWork for: {x}, delay: {delay} ms");
    
                Thread.Sleep (delay);
    
                Console.WriteLine ($"  --- SomeWork for: {x} - finished.");
    
                return true;
            }
    
            //----------------------------------------------------------------------------------------------------------------------------------
    
            private static void Executor_ProcessingStarted (IReadOnlyList<Task<bool>> items)
            {
                Task.Run (() =>
                {
                    Task.WaitAll (items.ToArray ());
                    Console.WriteLine ("Finished processing tasks, count = " + items.Count);
                });
            }
        }
    
        //====== actual code ===================================================================================================================
    
        public delegate void ItemsAvailable<T> (IReadOnlyList<T> items); // new type to simplify code
    
        public delegate bool ProcessItem<T> (T item); // processes the given item and returns true if job is done with success
    
        //======================================================================================================================================
    
        public interface IDataAvailableEvent<T>
        {
            event ItemsAvailable<T> DataAvailable; // occurs when buffer need to be processed (also before raising this event, buffer should be cleared)
        }
    
        //======================================================================================================================================
    
        public interface IProcessingStartedEvent<T>
        {
            event ItemsAvailable<Task<bool>> ProcessingStarted; // executor raises this event when all tasks are created and started
        }
    
        //======================================================================================================================================
    
        public interface IBuffer<T> : IDataAvailableEvent<T>
        {
            bool Enqueue (T item); // adds new item to buffer (but sometimes it can ignore item, for example if we need only unique items in list)
                                   // returns: true = buffer is not empty, false = is emtpy
    
            void FlushBuffer ();   // should clear buffer and raise event (or not raise if buffer was already empty)
        }
    
        //======================================================================================================================================
    
        // raises DataAvailable event when buffer cap is reached
        // ignores duplicates
    
        // you can only use this class from one thread
    
        public class UniqueItemsBuffer<T> : IBuffer<T>
        {
            public event ItemsAvailable<T> DataAvailable;
    
            readonly int capacity;
            HashSet<T> items = new HashSet<T> ();
    
            public UniqueItemsBuffer (int capacity = 10)
            {
                this.capacity = capacity;
            }
    
            public bool Enqueue (T item)
            {
                if (items.Add (item) && items.Count == capacity)
                {
                    FlushBuffer ();
                }
    
                return items.Count > 0;
            }
    
            public void FlushBuffer ()
            {
                Console.WriteLine ("BUFFER :: flush, item count = " + items.Count);
    
                if (items.Count > 0)
                {
                    var itemsCopy = items.ToList ();
                    items.Clear ();
    
                    DataAvailable?.Invoke (itemsCopy);
                }
            }
        }
    
        //======================================================================================================================================
    
        public class Executor<T> : IProcessingStartedEvent<T>, IDisposable
        {
            public event ItemsAvailable<Task<bool>> ProcessingStarted;
    
            readonly ProcessItem<T> work;
            readonly IDataAvailableEvent<T> dataEvent;
    
            public Executor (ProcessItem<T> work, IDataAvailableEvent<T> dataEvent)
            {
                this.work = work;
                this.dataEvent = dataEvent;
    
                dataEvent.DataAvailable += DataEvent_DataAvailable;
            }
    
            private void DataEvent_DataAvailable (IReadOnlyList<T> items)
            {
                Console.WriteLine ("EXECUTOR :: new items to process available, count = " + items.Count);
    
                var list = new List<Task<bool>> ();
    
                foreach (var item in items)
                {
                    var task = Task.Run (() => work (item));
    
                    list.Add (task);
                }
    
                Console.WriteLine ("EXECUTOR :: raising processing started event (this msg can appear later than messages from SomeWork)");
    
                ProcessingStarted?.Invoke (list);
            }
    
            public void Dispose ()
            {
                dataEvent.DataAvailable -= DataEvent_DataAvailable;
            }
        }
    
        //======================================================================================================================================
    
        // if you want to fill buffer using many threads - use this decorator
    
        public sealed class ThreadSafeBuffer<T> : IBuffer<T>
        {
            public event ItemsAvailable<T> DataAvailable;
    
            readonly IBuffer<T> target;
            readonly object sync = new object ();
    
            private ThreadSafeBuffer (IBuffer<T> target)
            {
                this.target = target;
                this.target.DataAvailable += (items) => DataAvailable?.Invoke (items); // TODO: unpin event :P
            }
    
            public bool Enqueue (T item)
            {
                lock (sync) return target.Enqueue (item);
            }
    
            public void FlushBuffer ()
            {
                lock (sync) target.FlushBuffer ();
            }
    
            public static IBuffer<T> MakeThreadSafe (IBuffer<T> target)
            {
                if (target is ThreadSafeBuffer<T>) return target;
    
                return new ThreadSafeBuffer<T> (target);
            }
        }
    
        //======================================================================================================================================
    
        // and now if you want to process buffer after elapsed time
    
        public interface IAlert
        {
            CancellationTokenSource CreateAlert (TimeSpan delay, Action action); // will execute 'action' after given delay (non blocking)
        }
    
        // I didn't use much timers, so idk is this code good
    
        public class Alert : IAlert
        {
            List<System.Timers.Timer> timers = new List<System.Timers.Timer> (); // we need to keep reference to timer to avoid dispose
    
            public CancellationTokenSource CreateAlert (TimeSpan delay, Action action)
            {
                var cts = new CancellationTokenSource ();
    
                var timer = new System.Timers.Timer (delay.TotalMilliseconds);
                timers.Add (timer);
    
                timer.Elapsed += (sender, e) =>
                {
                    timers.Remove (timer);
    
                    timer.Dispose ();
    
                    if (cts.Token.IsCancellationRequested) return;
    
                    action.Invoke ();
                };
    
                timer.AutoReset = false; // just one tick
                timer.Enabled = true;
    
                return cts;
            }
        }
    
        // thread safe (maybe :-D)
    
        public class BufferWithTimeout<T> : IBuffer<T>
        {
            public event ItemsAvailable<T> DataAvailable;
    
            readonly IBuffer<T> target;
            readonly IAlert     alert;
            readonly TimeSpan   timeout;
    
            CancellationTokenSource cts;
    
            readonly object sync = new object ();
    
            public BufferWithTimeout (IBuffer<T> target, IAlert alert, TimeSpan timeout)
            {
                this.target  = ThreadSafeBuffer<T>.MakeThreadSafe (target); // alert can be raised from different thread
                this.alert   = alert;
                this.timeout = timeout;
    
                target.DataAvailable += Target_DataAvailable; // TODO: unpin event
            }
    
            private void Target_DataAvailable (IReadOnlyList<T> items)
            {
                lock (sync)
                {
                    DisableTimer ();
                }
    
                DataAvailable?.Invoke (items);
            }
    
            public bool Enqueue (T item)
            {
                lock (sync)
                {
                    bool hasItems = target.Enqueue (item); // can raise underlying flush -> dataAvailable event (will disable timer)
    
                    // and now if buffer is empty, we cannot start timer
    
                    if (hasItems && cts == null) // if timer is not enabled
                    {
                        Console.WriteLine ("TIMER :: created alert");
                        cts = alert.CreateAlert (timeout, HandleAlert);
                    }
    
                    return hasItems;
                }
            }
    
            public void FlushBuffer ()
            {
                lock (sync)
                {
                    DisableTimer ();
                    target.FlushBuffer ();
                }
            }
    
            private void HandleAlert ()
            {
                lock (sync)
                {
                    Console.WriteLine ("TIMER :: handler, will call buffer flush");
                    target.FlushBuffer ();
                }
            }
    
            private void DisableTimer ()
            {
                cts?.Cancel ();
                cts = null;
    
                Console.WriteLine ("TIMER :: disable");
            }
        }
    }