Search code examples
c#.netconcurrencytaskparallel.foreach

Nested asynchronous tasks in .NET


I'm writing a script for extending one of my datasets of email data with some features that I have to compute by calling external APIs. I have to use C#, which is not a language I'm very confident with.

Specifically, I have N emails (emails array), each having a mailID and properties contained in a MailData object. For each email, I need to call 3 asynchronous methods (ComputeAttachmentsFeatures, ComputeVirusTotalFeatures, ComputeHeaderFeatures) that call external APIs and add some retrieved data to the mail object.

Moreover, each mail object has 0 or more URLs in the MailURLs attribute. For each of these URLs I have to call 3 more asynchronous methods (ComputeDNSFeatures, ComputeWhoIsFeatures, ComputePageRankFeatures).

I'm expecting that, at each iteration, I run in parallel the first 3 tasks (ComputeAttachmentsFeatures, ComputeVirusTotalFeatures, ComputeHeaderFeatures) and the 3 tasks for each URL (ComputeDNSFeatures, ComputeWhoIsFeatures, ComputePageRankFeatures). For how I wrote the following code, I'm expecting to have maximum 6 tasks running in parallel.

The code below is not working properly, since not all tasks are properly waited and the iteration just keeps going before completion, passing on to the next mail objects. i tried to use the Task.WaitAll(Task[]) method. What am I doing wrong?

foreach ((long mailID, MailData mail) in emails) 
                {
                    Logger.Info("Mail " + mailID);
                    Task [] mailTasks =
                    {
                        Task.Run(async () => await mail.ComputeAttachmentsFeatures(client_Attachments, mailID)),
                        Task.Run(async () => await mail.ComputeVirusTotalFeatures(client_VT, mailID)),
                        Task.Run(async () => await mail.ComputeHeaderFeatures(client_Headers, mailID))
                    };
                    //For each URL in email, call the APIs
                    foreach (var url in mail.MailURLs)
                    {
                        if (IsValidURL(url.FullHostName))
                        {
                            Task [] urlTasks =
                            {
                                Task.Run(async () => await url.ComputeDNSFeatures(mailID)), // Task.Factory.StartNew
                                Task.Run(async () => await url.ComputeWhoIsFeatures(mailID)),
                                Task.Run(async () => await url.ComputePageRankFeatures(client_PageRank, mailID))
                            };
                            Task.WaitAll(urlTasks);
                        }
                    }
                    Task.WaitAll(mailTasks);
                    Logger.Info("Mail completed " + mailID);
}

As an example, I can write one of the async functions that gets called:

public async Task<int> ComputeHeaderFeatures(HttpClient client, long mailId)
        {

            //  Blacklists check of the traversed mailservers  -n_smtp_servers_blacklist-
            n_smtp_servers_blacklist = 0;
            foreach (string mail_server in ServersInReceivedHeaders)
            {
                if (!string.IsNullOrEmpty(mail_server) && Program.IsValidURL(mail_server))  // Only analyze it if it's a valid URL or IP
                {
                    // API call to check the mail_server against more than 100 blacklists
                    BlacklistURL alreadyAnalyzedURL = (BlacklistURL)Program.BlacklistedURLs.Find(mail_server);    // Checks if the IP has already been analyzed
                    if (alreadyAnalyzedURL == null)
                    {
                        BlacklistURL blacklistsResult = new BlacklistURL(mail_server);
                        while (true)
                        {
                            try
                            {
                                int statusCode = await BlacklistURL_API.PerformAPICall(blacklistsResult, client);

                                if (statusCode == 429) // Rate Limit Hit
                                {
                                    Program.Logger.Debug($"BlackListChecker - Rate limit hit for mail {mailId} - {mail_server}, will retry after {CapDelayBlackListChecking} s");
                                    Thread.Sleep(CapDelayBlackListChecking * 1000);
                                    continue;
                                }
                                if (statusCode == 503)  // Service temporarily unavailable
                                {
                                    Program.Logger.Debug($"BlackListChecker - Service temporarily Unavailable (503 error), will retry after {DefaultDelay503} s");
                                    Thread.Sleep(DefaultDelay503 * 1000);
                                    continue;  // try again later
                                }
                                Program.BlacklistedURLs.Add(blacklistsResult);  // Adds the server and its result to the list of already analyzed servers
                                if (blacklistsResult.GetFeature() > 0) { n_smtp_servers_blacklist++; }  // If the server appears in at least 1 blacklist, we count it as malicious

                                Program.Logger.Debug($"BlackListChecker call for mail {mailId} - {mail_server} responded with status code {statusCode}");
                            }
                            catch (Exception ex)
                            {
                                Program.Logger.Error($"BlackListChecker - An exception occurred for mail {mailId} - {mail_server}:\n{ex}");
                                throw;
                            }
                            finally
                            {
                                Program.Logger.Debug($"BlackListChecker - Wait {DefaultGapBetweenCallsBlackListChecking} s before next call...");
                                Thread.Sleep(DefaultGapBetweenCallsBlackListChecking * 1000);
                            }
                            // Break the inner loop to proceed with the next argument
                            break;
                        }
                    }
                    else  // The mailserver has already been analyzed, so we take the available result
                    {
                        if (alreadyAnalyzedURL.NBlacklists > 0) { n_smtp_servers_blacklist++; }
                    }
                }
            }
return 1;
}

Solution

  • As JonasH commented on my question, removing

    Task.Run(async () => await foo())

    and just putting the returned tasks in the tasks-array worked as a charm.

    So the final code would look something like this:

    foreach ((long mailID, MailData mail) in emails) {
        Logger.Info("Mail " + mailID);
        Task [] mailTasks =
        {
            mail.ComputeAttachmentsFeatures(client_Attachments, mailID),
            mail.ComputeVirusTotalFeatures(client_VT, mailID),
            mail.ComputeHeaderFeatures(client_Headers, mailID)
        };
        //For each URL in email, call the APIs
        foreach (var url in mail.MailURLs)
        {
            if (IsValidURL(url.FullHostName))
            {
                Task [] urlTasks =
                {
                   url.ComputeDNSFeatures(mailID), 
                   url.ComputeWhoIsFeatures(mailID),
                   url.ComputePageRankFeatures(client_PageRank, mailID)
                };
                Task.WaitAll(urlTasks);
            }
        }
        Task.WaitAll(mailTasks);
        Logger.Info("Mail completed " + mailID);
    }