There is a .NET Core Worker Service that has Dataflow pattern used to do the parallel process. It has TransformBlock and ActionBlock.
While running the Worker process both TransformBlock and ActionBlock execute and return the result but XUnit Test run only do TransformBlock and does not wait for ActionBlock.
I tried adding await for ActionBlock but that is not continue after that statement.
Please advise
public class ServiceDataflow(IConfiguration configuration,
ILogger<WorkerServiceDataflow> logger,
IEService eService,
IOptions<ENSettings> enSettings) : IServiceDataflow
{
private enSettings _enSettings { get; } = enSettings.Value;
public async Task<DataTable> StartProcessing(IEnumerable<ENQueue> items)
{
DataTable Log = new DataTable();
Log.Columns.Add("Colum1", typeof(string));
Log.Columns.Add("Colum2", typeof(Decimal));
// Define dataflow blocks
var processingBlock = new TransformBlock<ENQueue, IResult>(async email =>
{
// Perform asynchronous processing on the item
try
{
var udTemplate = ReplacePlaceholder(email);
string name = string.Format("{0} {1}", email.Last_Name, email.First_Name);
await emailService.SendEmail(email.Address, email.Subject, udTemplate, name, _enSettings.IsSandBox);
return new Result { ProcessedData = email, Status = "Success", IsProcessed = true };
}
catch (Exception ex)
{
var exceptionMsg = string.Format("ServiceParallel: Error in TransformBlock: {0} - {1} - {2}", DateTime.Now, email.Address, ex.Message);
logger.LogError(ex, exceptionMsg);
return new Result { ProcessedData = email, Status = "Error" , IsProcessed = false};
}
});
var consumerBlock = new ActionBlock<IResult>(result =>
{
// Process the result (e.g., store in database, log)
if (result.IsProcessed)
{
// Access the result if the task returned a value
DataRow newRow = Log.NewRow();
newRow["Column1"] = result.ProcessedData?.Address;
newRow["Column2"] = result.ProcessedData?.TemplateId;
Log.Rows.Add(newRow);
logger.LogInformation("Email Send at: {0} Recipient: {1} status{2} ", DateTimeOffset.Now, result.ProcessedData?.Address, result.Status);
}
else
{
logger.LogInformation("Error processing email at: {0} Recipient: {1}", DateTimeOffset.Now, result.ProcessedData?.Address);
}
});
// Link the blocks
processingBlock.LinkTo(consumerBlock);
// Post items for processing
foreach (var item in items)
{
await processingBlock.SendAsync(item);
}
// Signal completion of adding items
processingBlock.Complete();
// Wait for processing to finish using Task.WaitAll
var completionTask = Task.WhenAll(processingBlock.Completion);
await completionTask;
return Log;
}
}
The unit test
public class ServiceDataflowTest : ServiceDataflowFixture
{
[Fact]
public async Task StartProcessing_ProcessesItemsSuccessfully_ReturnsDataTable()
{
try
{
// Mock dependencies
_mockOptions.Setup(m => m.Value).Returns(new EmailNotificationSettings { IsSandBox = true,
UnSubscribeURL= "https://serverurl/Unsubscribe" ,
ProcessUser = "Test1",
url = "https://serverurl"
});
// Setup for LogInformation with any message and arguments
_mockLogger.Setup(m => m.Log(
It.IsAny<LogLevel>(),
It.IsAny<EventId>(),
It.IsAny<object>(), // Capture any object passed as state
It.IsAny<Exception>(),
It.IsAny<Func<object, Exception, string>>() // Don't care about formatter
)).Verifiable();
_mockEService.Setup(m => m.SendEmail(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>(), It.IsAny<bool>()))
.Returns(Task.FromResult(new SendGrid.Response(HttpStatusCode.OK, null, null))); // Assuming this constructor exists
// Sample data
var sampleItem = new ENQueue
{
Email_Address = "xyx@example.com",
Subject = "qwqwqw",
};
var items = new List<ENQueue>() { sampleItem };
// Create the Dataflow instance
var dataflow = new ServiceDataflow(_mockConfiguration.Object, _mockLogger.Object,
_mockEService.Object, _mockOptions.Object);
// Call the method under test
var dataTable = await dataflow.StartProcessing(items);
// Assertions
Assert.NotNull(dataTable);
Assert.Single(dataTable.Rows); // Expect one row for the processed item
/* Assert.Equal("test@example.com", dataTable.Rows[0]["Column1"]);
Assert.Equal(123, dataTable.Rows[0]["Column2"]);
Assert.Equal(123, dataTable.Rows[0]["Column3"]);*/
// Verify eService.Send was called with expected arguments
_mockEService.Verify(m => m.SendEmail(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>(), It.IsAny<bool>()), Times.Once);
//_mockLogger.Verify(m => m.LogInformation(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>()), Times.AtLeastOnce);
_mockLogger.Verify();
}
catch (Exception ex) { }
}
}
The above unit test on the following line
var dataTable = await dataflow.StartProcessing(items);
It returns to before ActionBlock
execute, but if I run the worker service it does.
Please advise.
Propagate completion from the processingBlock
to the consumerBlock
. Then only wait for completion on the consumerBlock
.
processingBlock.LinkTo(
consumerBlock,
new DataflowLinkOptions() { PropagateCompletion = true }
);
foreach (var item in items)
{
await processingBlock.SendAsync(item);
}
processingBlock.Complete();
await consumerBlock.Completion;
Also you mention "parallel process" but the emailing is not performed in parallel. To do so you need to set MaxDegreeOfParallelism
on the ExecutionDataflowBlockOptions
. You might as well set EnsureOrdered
to false
also.
var processingBlock = new TransformBlock<ENQueue, IResult>(async email =>
{
// Perform asynchronous processing on the item
}, new ExecutionDataflowBlockOptions
{
EnsureOrdered = false,
MaxDegreeOfParallelism = 10,
});