Search code examples
c#unit-testing.net-corexunit.nettpl-dataflow

C# .NET Core Dataflow Pattern Unit Test does not wait for action to complete


There is a .NET Core Worker Service that has Dataflow pattern used to do the parallel process. It has TransformBlock and ActionBlock.

While running the Worker process both TransformBlock and ActionBlock execute and return the result but XUnit Test run only do TransformBlock and does not wait for ActionBlock.

I tried adding await for ActionBlock but that is not continue after that statement.

Please advise

public class ServiceDataflow(IConfiguration configuration,
          ILogger<WorkerServiceDataflow> logger,
          IEService eService,
          IOptions<ENSettings> enSettings) : IServiceDataflow
{
    private enSettings _enSettings { get; } = enSettings.Value;

    public async Task<DataTable> StartProcessing(IEnumerable<ENQueue> items)
    {
        DataTable Log = new DataTable();
        Log.Columns.Add("Colum1", typeof(string));
        Log.Columns.Add("Colum2", typeof(Decimal));

        // Define dataflow blocks
        var processingBlock = new TransformBlock<ENQueue, IResult>(async email =>
        {
            // Perform asynchronous processing on the item
            try
            {
                var udTemplate = ReplacePlaceholder(email);
                string name = string.Format("{0} {1}", email.Last_Name, email.First_Name);
                await emailService.SendEmail(email.Address, email.Subject, udTemplate, name, _enSettings.IsSandBox);
                return new Result { ProcessedData = email, Status = "Success", IsProcessed = true };
            }
            catch (Exception ex)
            {
                var exceptionMsg = string.Format("ServiceParallel: Error in TransformBlock: {0} - {1} - {2}", DateTime.Now, email.Address, ex.Message);
                logger.LogError(ex, exceptionMsg);
                return new Result { ProcessedData = email, Status = "Error" , IsProcessed = false};
            }
        });

        var consumerBlock = new ActionBlock<IResult>(result =>
        {
            // Process the result (e.g., store in database, log)
            if (result.IsProcessed)
            {
                // Access the result if the task returned a value
                DataRow newRow = Log.NewRow();
                newRow["Column1"] = result.ProcessedData?.Address;
                newRow["Column2"] = result.ProcessedData?.TemplateId;
                 
                Log.Rows.Add(newRow);
                logger.LogInformation("Email Send at: {0} Recipient: {1} status{2} ", DateTimeOffset.Now, result.ProcessedData?.Address, result.Status);                   
            }
            else
            {
                logger.LogInformation("Error processing email at: {0} Recipient: {1}", DateTimeOffset.Now, result.ProcessedData?.Address);
            }
        });

        // Link the blocks
        processingBlock.LinkTo(consumerBlock);

        // Post items for processing
        foreach (var item in items)
        {
            await processingBlock.SendAsync(item);
        }

        // Signal completion of adding items
        processingBlock.Complete();

        // Wait for processing to finish using Task.WaitAll
        var completionTask = Task.WhenAll(processingBlock.Completion);

        await completionTask;
        return Log;
    }
}

The unit test

public class ServiceDataflowTest : ServiceDataflowFixture
{
    [Fact]
    public async Task StartProcessing_ProcessesItemsSuccessfully_ReturnsDataTable()
    {
        try
        {
            // Mock dependencies
            _mockOptions.Setup(m => m.Value).Returns(new EmailNotificationSettings { IsSandBox = true, 
                UnSubscribeURL= "https://serverurl/Unsubscribe" ,
                ProcessUser = "Test1",
                url = "https://serverurl"
            });

            // Setup for LogInformation with any message and arguments
            _mockLogger.Setup(m => m.Log(
              It.IsAny<LogLevel>(),
              It.IsAny<EventId>(),
              It.IsAny<object>(), // Capture any object passed as state
              It.IsAny<Exception>(),
              It.IsAny<Func<object, Exception, string>>() // Don't care about formatter
            )).Verifiable();

            _mockEService.Setup(m => m.SendEmail(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>(), It.IsAny<bool>()))
.Returns(Task.FromResult(new SendGrid.Response(HttpStatusCode.OK, null, null))); // Assuming this constructor exists

            // Sample data
            var sampleItem = new ENQueue
            {
                Email_Address = "xyx@example.com",
                Subject = "qwqwqw",
            };
            var items = new List<ENQueue>() { sampleItem };

            // Create the Dataflow instance
            var dataflow = new ServiceDataflow(_mockConfiguration.Object, _mockLogger.Object,
                                        _mockEService.Object, _mockOptions.Object);

            // Call the method under test
            var dataTable = await dataflow.StartProcessing(items);

            // Assertions
            Assert.NotNull(dataTable);
            Assert.Single(dataTable.Rows); // Expect one row for the processed item
            /*                Assert.Equal("test@example.com", dataTable.Rows[0]["Column1"]);
                            Assert.Equal(123, dataTable.Rows[0]["Column2"]);
                            Assert.Equal(123, dataTable.Rows[0]["Column3"]);*/

            // Verify eService.Send was called with expected arguments
            _mockEService.Verify(m => m.SendEmail(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>(), It.IsAny<bool>()), Times.Once);
            //_mockLogger.Verify(m => m.LogInformation(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>()), Times.AtLeastOnce);

            _mockLogger.Verify();
        }
        catch (Exception ex) { }
    }
}

The above unit test on the following line

var dataTable = await dataflow.StartProcessing(items);

It returns to before ActionBlock execute, but if I run the worker service it does.

Please advise.


Solution

  • Propagate completion from the processingBlock to the consumerBlock. Then only wait for completion on the consumerBlock.

    processingBlock.LinkTo(
         consumerBlock, 
         new DataflowLinkOptions() { PropagateCompletion = true }
    );
    foreach (var item in items)
    {
        await processingBlock.SendAsync(item);
    }
    processingBlock.Complete();
    await consumerBlock.Completion;
    

    Also you mention "parallel process" but the emailing is not performed in parallel. To do so you need to set MaxDegreeOfParallelism on the ExecutionDataflowBlockOptions. You might as well set EnsureOrdered to false also.

    var processingBlock = new TransformBlock<ENQueue, IResult>(async email =>
    {
        // Perform asynchronous processing on the item
    }, new ExecutionDataflowBlockOptions
    {
        EnsureOrdered = false,
        MaxDegreeOfParallelism = 10,
    });