Search code examples
c#.netasynchronouswindows-servicestpl-dataflow

How to implement a continuous producer-consumer pattern inside a Windows Service


Here's what I'm trying to do:

  • Keep a queue in memory of items that need processed (i.e. IsProcessed = 0)
  • Every 5 seconds, get unprocessed items from the db, and if they're not already in the queue, add them
  • Continuous pull items from the queue, process them, and each time an item is processed, update it in the db (IsProcessed = 1)
  • Do this all "as parallel as possible"

I have a constructor for my service like

public MyService()
{
    Ticker.Elapsed += FillQueue;
}

and I start that timer when the service starts like

protected override void OnStart(string[] args)
{
    Ticker.Enabled = true;
    Task.Run(() => { ConsumeWork(); });
}

and my FillQueue is like

private static async void FillQueue(object source, ElapsedEventArgs e)   
{
    var items = GetUnprocessedItemsFromDb();
    foreach(var item in items)
    {
        if(!Work.Contains(item))
        {
            Work.Enqueue(item);
        }   
    }
}

and my ConsumeWork is like

private static void ConsumeWork()
{
    while(true)
    {
        if(Work.Count > 0)
        {
            var item = Work.Peek();
            Process(item);
            Work.Dequeue();
        }
        else
        {
            Thread.Sleep(500);
        }
    }
}

However this is probably a naive implementation and I'm wondering whether .NET has any type of class that is exactly what I need for this type of situation.


Solution

  • Though @JSteward' answer is a good start, you can improve it with mixing up the TPL-Dataflow and Rx.NET extensions, as a dataflow block may easily become an observer for your data, and with Rx Timer it will be much less effort for you (Rx.Timer explanation).

    We can adjust MSDN article for your needs, like this:

    private const int EventIntervalInSeconds = 5;
    private const int DueIntervalInSeconds = 60;
    
    var source =
        // sequence of Int64 numbers, starting from 0
        // https://msdn.microsoft.com/en-us/library/hh229435.aspx
        Observable.Timer(
            // fire first event after 1 minute waiting
            TimeSpan.FromSeconds(DueIntervalInSeconds),
            // fire all next events each 5 seconds
            TimeSpan.FromSeconds(EventIntervalInSeconds))
        // each number will have a timestamp
        .Timestamp()
        // each time we select some items to process
        .SelectMany(GetItemsFromDB)
        // filter already added
        .Where(i => !_processedItemIds.Contains(i.Id));
    
    var action = new ActionBlock<Item>(ProcessItem, new ExecutionDataflowBlockOptions
        {
            // we can start as many item processing as processor count
            MaxDegreeOfParallelism = Environment.ProcessorCount,
        });
    
    IDisposable subscription = source.Subscribe(action.AsObserver());
    

    Also, your check for item being already processed isn't quite accurate, as there is a possibility to item get selected as unprocessed from db right at the time you've finished it's processing, yet didn't update it in database. In this case item will be removed from Queue<T>, and after that added there again by producer, this is why I've added the ConcurrentBag<T> to this solution (HashSet<T> isn't thread-safe):

    private static async Task ProcessItem(Item item)
    {
        if (_processedItemIds.Contains(item.Id))
        {
            return;
        }
    
        _processedItemIds.Add(item.Id);
        // actual work here
    
        // save item as processed in database
    
        // we need to wait to ensure item not to appear in queue again 
        await Task.Delay(TimeSpan.FromSeconds(EventIntervalInSeconds * 2));
    
        // clear the processed cache to reduce memory usage
        _processedItemIds.Remove(item.Id);
    }
    
    public class Item
    {
        public Guid Id { get; set; }
    }
    
    // temporary cache for items in process
    private static ConcurrentBag<Guid> _processedItemIds = new ConcurrentBag<Guid>();
    
    private static IEnumerable<Item> GetItemsFromDB(Timestamped<long> time)
    {
        // log event timing
        Console.WriteLine($"Event # {time.Value} at {time.Timestamp}");
    
        // return items from DB
        return new[] { new Item { Id = Guid.NewGuid() } };
    }
    

    You can implement cache clean up in other way, for example, start a "GC" timer, which will remove processed items from cache on regular basis.

    To stop events and processing items you should Dispose the subscription and, maybe, Complete the ActionBlock:

    subscription.Dispose();
    action.Complete();
    

    You can find more information about Rx.Net in their guidelines on github.