-1

I'm writing a script for extending one of my datasets of email data with some features that I have to compute by calling external APIs. I have to use C#, which is not a language I'm very confident with.

Specifically, I have N emails (emails array), each having a mailID and properties contained in a MailData object. For each email, I need to call 3 asynchronous methods (ComputeAttachmentsFeatures, ComputeVirusTotalFeatures, ComputeHeaderFeatures) that call external APIs and add some retrieved data to the mail object.

Moreover, each mail object has 0 or more URLs in the MailURLs attribute. For each of these URLs I have to call 3 more asynchronous methods (ComputeDNSFeatures, ComputeWhoIsFeatures, ComputePageRankFeatures).

I'm expecting that, at each iteration, I run in parallel the first 3 tasks (ComputeAttachmentsFeatures, ComputeVirusTotalFeatures, ComputeHeaderFeatures) and the 3 tasks for each URL (ComputeDNSFeatures, ComputeWhoIsFeatures, ComputePageRankFeatures). For how I wrote the following code, I'm expecting to have maximum 6 tasks running in parallel.

The code below is not working properly, since not all tasks are properly waited and the iteration just keeps going before completion, passing on to the next mail objects. i tried to use the Task.WaitAll(Task[]) method. What am I doing wrong?

foreach ((long mailID, MailData mail) in emails) 
                {
                    Logger.Info("Mail " + mailID);
                    Task [] mailTasks =
                    {
                        Task.Run(async () => await mail.ComputeAttachmentsFeatures(client_Attachments, mailID)),
                        Task.Run(async () => await mail.ComputeVirusTotalFeatures(client_VT, mailID)),
                        Task.Run(async () => await mail.ComputeHeaderFeatures(client_Headers, mailID))
                    };
                    //For each URL in email, call the APIs
                    foreach (var url in mail.MailURLs)
                    {
                        if (IsValidURL(url.FullHostName))
                        {
                            Task [] urlTasks =
                            {
                                Task.Run(async () => await url.ComputeDNSFeatures(mailID)), // Task.Factory.StartNew
                                Task.Run(async () => await url.ComputeWhoIsFeatures(mailID)),
                                Task.Run(async () => await url.ComputePageRankFeatures(client_PageRank, mailID))
                            };
                            Task.WaitAll(urlTasks);
                        }
                    }
                    Task.WaitAll(mailTasks);
                    Logger.Info("Mail completed " + mailID);
}

As an example, I can write one of the async functions that gets called:

public async Task<int> ComputeHeaderFeatures(HttpClient client, long mailId)
        {

            //  Blacklists check of the traversed mailservers  -n_smtp_servers_blacklist-
            n_smtp_servers_blacklist = 0;
            foreach (string mail_server in ServersInReceivedHeaders)
            {
                if (!string.IsNullOrEmpty(mail_server) && Program.IsValidURL(mail_server))  // Only analyze it if it's a valid URL or IP
                {
                    // API call to check the mail_server against more than 100 blacklists
                    BlacklistURL alreadyAnalyzedURL = (BlacklistURL)Program.BlacklistedURLs.Find(mail_server);    // Checks if the IP has already been analyzed
                    if (alreadyAnalyzedURL == null)
                    {
                        BlacklistURL blacklistsResult = new BlacklistURL(mail_server);
                        while (true)
                        {
                            try
                            {
                                int statusCode = await BlacklistURL_API.PerformAPICall(blacklistsResult, client);

                                if (statusCode == 429) // Rate Limit Hit
                                {
                                    Program.Logger.Debug($"BlackListChecker - Rate limit hit for mail {mailId} - {mail_server}, will retry after {CapDelayBlackListChecking} s");
                                    Thread.Sleep(CapDelayBlackListChecking * 1000);
                                    continue;
                                }
                                if (statusCode == 503)  // Service temporarily unavailable
                                {
                                    Program.Logger.Debug($"BlackListChecker - Service temporarily Unavailable (503 error), will retry after {DefaultDelay503} s");
                                    Thread.Sleep(DefaultDelay503 * 1000);
                                    continue;  // try again later
                                }
                                Program.BlacklistedURLs.Add(blacklistsResult);  // Adds the server and its result to the list of already analyzed servers
                                if (blacklistsResult.GetFeature() > 0) { n_smtp_servers_blacklist++; }  // If the server appears in at least 1 blacklist, we count it as malicious

                                Program.Logger.Debug($"BlackListChecker call for mail {mailId} - {mail_server} responded with status code {statusCode}");
                            }
                            catch (Exception ex)
                            {
                                Program.Logger.Error($"BlackListChecker - An exception occurred for mail {mailId} - {mail_server}:\n{ex}");
                                throw;
                            }
                            finally
                            {
                                Program.Logger.Debug($"BlackListChecker - Wait {DefaultGapBetweenCallsBlackListChecking} s before next call...");
                                Thread.Sleep(DefaultGapBetweenCallsBlackListChecking * 1000);
                            }
                            // Break the inner loop to proceed with the next argument
                            break;
                        }
                    }
                    else  // The mailserver has already been analyzed, so we take the available result
                    {
                        if (alreadyAnalyzedURL.NBlacklists > 0) { n_smtp_servers_blacklist++; }
                    }
                }
            }
return 1;
}
6
  • What happens if you remove the Task.Run calls and just start with the await mail. calls? Your surrounding method must be async and the Task.WaitAll calls should be unnecessary at all. Commented Jan 9, 2024 at 11:54
  • 1
    You should probably remove Task.Run(async () => await, and just call the asynchronous methods, and put the tasks they return in the tasks-array. Commented Jan 9, 2024 at 12:14
  • 2
    I would expect the commented out Task.Factory.StartNew to work incorrectly with async delegates but Task.Run should be fine. Do you have a minimal reproducible example? Commented Jan 9, 2024 at 13:02
  • 1
    Could you replace all this complicated code with a minimal example that reproduces the same behavior? Most likely the problem is located in code not included in the question. Start by replacing all async calls with await Task.Delay(1000) one by one. Keep removing code until the problem stops reproducing. Then undo the last code removal, and show us the minimal code that reproduces the problem. Commented Jan 9, 2024 at 14:45
  • Also that // Task.Factory.StartNew comment gives bad vibes. Please search your whole codebase for any instance of uncommented Task.Factory.StartNew calls. This method is highly specialized, and you shouldn't use it unless you are familiar with TPL inside and out. Commented Jan 9, 2024 at 14:51

2 Answers 2

-1

When using Task.Run, the async () => await .. that is passed will return a Func which is not equivalent to Task so, the argument won't be awaited and only the new Task from the Run method will be awaited. therefore, it is equivalent to treating the Task as fire and forget.

Try This

foreach ((long mailID, MailData mail) in emails)
{
    Logger.Info("Mail " + mailID);
    Task[] mailTasks =
    {
                mail.ComputeAttachmentsFeatures(client_Attachments, mailID),
                mail.ComputeVirusTotalFeatures(client_VT, mailID),
                mail.ComputeHeaderFeatures(client_Headers, mailID)
            };
    //For each URL in email, call the APIs
    foreach (var url in mail.MailURLs)
    {
        if (IsValidURL(url.FullHostName))
        {
            Task[] urlTasks =
            {
                        url.ComputeDNSFeatures(mailID), // Task.Factory.StartNew
                        url.ComputeWhoIsFeatures(mailID),
                        url.ComputePageRankFeatures(client_PageRank, mailID)
                    };
            Task.WaitAll(urlTasks);
        }
    }
    Task.WaitAll(mailTasks);
    Logger.Info("Mail completed " + mailID);
}
Sign up to request clarification or add additional context in comments.

4 Comments

Posting code with no explanation makes this a bad post.
Task.Run is Task-aware so TBH I would expect this to have no effect.
async () => await is not a Task it is Func<Task>.
The Task.Run returns a Task that represents the result of the asynchronous operation. The asynchronous delegate is invoked on the ThreadPool, as opposed to be invoked on the current thread. Using the Task.Run is fine. It's not a problem.
-1

As JonasH commented on my question, removing

Task.Run(async () => await foo())

and just putting the returned tasks in the tasks-array worked as a charm.

So the final code would look something like this:

foreach ((long mailID, MailData mail) in emails) {
    Logger.Info("Mail " + mailID);
    Task [] mailTasks =
    {
        mail.ComputeAttachmentsFeatures(client_Attachments, mailID),
        mail.ComputeVirusTotalFeatures(client_VT, mailID),
        mail.ComputeHeaderFeatures(client_Headers, mailID)
    };
    //For each URL in email, call the APIs
    foreach (var url in mail.MailURLs)
    {
        if (IsValidURL(url.FullHostName))
        {
            Task [] urlTasks =
            {
               url.ComputeDNSFeatures(mailID), 
               url.ComputeWhoIsFeatures(mailID),
               url.ComputePageRankFeatures(client_PageRank, mailID)
            };
            Task.WaitAll(urlTasks);
        }
    }
    Task.WaitAll(mailTasks);
    Logger.Info("Mail completed " + mailID);
}

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.