Search code examples
c#.netgenericsqueue

How do I design a queue with a generic Func<Task<T>>?


I have a class that calls external APIs with limited call so I need to put it in a queue and call the requests one by one. However I am not sure how I can design a queue with different return types. This is my current design:

public class MyQueue // singleton
{

    readonly ConcurrentQueue<MyQueueItem> queue = [];

    public async Task<T> EnqueueAndWaitAsync<T>(Func<Task<T>> func)
    {
        var item = new MyQueueItem<T>(func);
        queue.Enqueue(item);
        return await item.Task;
    }

    public async Task<MyQueueItem> DequeueAsync()
    {
        while (true)
        {
            if (queue.TryDequeue(out var item))
            {
                return item;
            }
            await Task.Delay(100);
        }
    }
}

public class MyQueueItem { }

public class MyQueueItem<T>(Func<Task<T>> func) : MyQueueItem
{

    public Func<Task<T>> Func => func;

    readonly TaskCompletionSource<T> tcs = new();
    public Task<T> Task => tcs.Task;

}

My API service would enqueue and wait like this:

public class MyApi(MyQueue queue)
{

    public async Task<int> GetInt() => await queue.EnqueueAndWaitAsync(async () =>
    {
        await Task.Delay(1000);
        return 1;
    });

    public async Task<string> GetString() => await queue.EnqueueAndWaitAsync(async () =>
    {
        await Task.Delay(2000);
        return "Hello";
    });

}

Now the problem is, I need a processor that calls Dequeue and actually executes the functions. However I cannot know which method is being called. Is there anyway so I don't have to convert them all to object or use Reflection?

public class MyQueueProcessor(MyQueue queue)
{

    public async Task DoWorkAsync()
    {
        while (true) // In production I would wait here for API throttling
        {
            var item = await queue.DequeueAsync();
            // How do I know which type to cast to call Func()?
        }
    }

}

I have full control of the code so I can change any of the above code.


Solution

  • public class MyQueueItem { }
    

    That's your solution. You've got a non-generic queue item that anyone can interact with without knowing T, but you're not using it!

    public class MyQueueItem
    {
        public abstract Task ProcessAsync();
    }
    

    There, now anyone can execute a queue item. Let's implement it:

    public class MyQueueItem<T>(Func<Task<T>> func) : MyQueueItem
    {
        private readonly TaskCompletionSource<T> tcs = new();
        public Task<T> Task => tcs.Task;
    
        public override async Task ProcessAsync()
        {
            try
            {
                var result = await func();
                tcs.SetResult(result);
            }
            catch (Exception ex)
            {
                tcs.SetException(ex);
            }
        }
    }
    

    Now MyQueueItem<T> knows how to execute itself, and through the joys of encapsulation you've hidden that from anyone who doesn't know what T is.