Search code examples
.net-coreapache-kafkaasync-awaitdelegatesbackground-service

Background service blocks after await in ExecuteAsync in .Net Core 3.1 application


I need a service that continously poll messages from Kafka topic, give the message to a processing task, takes the result from this task end push it to another Kafka topic. I'm using BackgroundService from .Net Core 3.1 as basic infrastructure, while I wrote a ConsumerProducer class that encapsulate the Kafka stuff that has the following public method:

    public async Task ReceiveAndReply(ReplyCallback replyCallback)
    {
        try
        {
            var msg = consumer.Pull();

            var reply = await replyCallback(msg);

            producer.PushAsync((message)reply);
        }
        catch (Exception e)
        {
            throw new Exception(e.InnerException.Message);
        };

    }

The BackgroundService derived class is as follows:

public class Worker : BackgroundService{

    // initialization stuff....

   protected ReplyCallback processDelegate = new ReplyCallback(ProcessQuery);

    protected static Task<message> ProcessQuery(message source)
    {
        return new Task<message>>(() => 
        {
            
            try
            {
                var messagePayload= ProcessPulledMessage(source);

                var replyMessage = new Message(messagePayload);

                return replyMessage;
            }
            catch(Exception ex)
            {
                Console.WriteLine(ex.Message);
                return null;
            }
        });
    }
    

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {

        while (!stoppingToken.IsCancellationRequested)
        {
            try
            {
                await connector.ReceiveAndReply(processDelegate);
            }
            catch (Exception ex)
            {
                throw new Exception(ex.InnerException.Message);
            }
        }
    }


}

The code is simplified, but it describes the situation. While running I'm able to consume messages from topic, but when it comes to await the task in ProcessQuery the execution exits, the application continue running, but awaited task is never executed. Maybe I'm doing something completely wrong, I'm not expert in complex asynchronous works, so any tips will be very welcome. Thank you


Solution

  • You are returning a non-started task.

    The documentation for the Task constructor states that:

    Rather than calling this constructor, the most common way to instantiate a Task object and launch a task is by calling the static Task.Run(Action) or TaskFactory.StartNew(Action) method.