Search code examples
c#asp.net-mvctaskout-of-memory.net-6.0

Using millions of Tasks leads to the error System.OutOfMemoryException (C#)


I have 6 million small (average size about 15 bytes) files that I need to read and further process using the processor. I have previously implemented this using Task.Factory and it worked on asp.net core 2.1 without problems. It took about 20 hours.

I have now migrated the app to asp.net 6 and on the test server, my web application stops responding to any requests after starting these file operations. In the log I see the error System.OutOfMemoryException.

I suppose my way of implementation is far from ideal. I would like to know what ways of multi-threaded implementation of this work can you suggest?

Method ImportSignatures from controller:

[HttpPost("ImportSignatures")]
public JsonResult ImportSignatures()
{
    try
    {
        return Json(SignatureImportService.ImportSigningCerts());
    }
    catch (Exception e)
    {
        LogsHelper.WriteLog("api/Settings/ImportSignatures", e);
        return Json(new ImportSigningCertsResult(e.Message, SignatureImportService.WasCancelled));
    }
}

Method ImportSigningCerts:

public static ImportSigningCertsResult ImportSigningCerts()
{
    LogsHelper.WriteEventLog("Launching SignatureImportService");
    WasCancelled = false;
    IsWorking = true;
    ResultStr = "";
    totalSignatures = 0;
    processedSignatures = 0;

    var cancelMsg = "Certificate import was interrupted. \n";
    var endMsg = "Certificate import completed successfully. \n";
    var toDelete = new List<string>();

    try
    {
        var configuration = SignatureImportConfiguration.FromCfg();

        using (s_tokenSource = new CancellationTokenSource())
        {
            List<string> signatures = Directory.EnumerateFiles(configuration.Path, "*.sig").ToList();
            totalSignatures = signatures.Count;

            Store mainStore = StoreMan.GetStore("Main");
            var importStats = new ImportStats();
            var tasks = new List<Task>();

            int saveIndex = 1;
            const int proccessedForSave = 100000; // After what number of processed signatures to perform intermediate storage and deletion of signatures
            CancellationToken token = s_tokenSource.Token;

            int minWorkerThreads, minCompletionPortThreads, maxWorkerThreads, maxCompletionPortThreads;
            ThreadPool.GetMinThreads(out minWorkerThreads, out minCompletionPortThreads);
            ThreadPool.GetMaxThreads(out maxWorkerThreads, out maxCompletionPortThreads);
            ThreadPool.SetMaxThreads(minWorkerThreads * 2, maxCompletionPortThreads);

            signatures.ForEach(path =>
            {
                tasks.Add(Task.Factory.StartNew(() =>
                {
                    token.ThrowIfCancellationRequested();

                    // Here reading of a current file and uploading the necessary certificates to the store from it
                    if (UploadSigningCerts(mainStore, path, importStats))
                    {
                        if (configuration.NeedCleaning)
                        {
                            lock (s_toDeleteListLockObj)
                                toDelete.Add(path);
                        }
                    }

                    // Here intermediate store's saving and deleting proccessed files
                    lock (s_intermediateSaveLockObj)
                    {
                        if (++processedSignatures > proccessedForSave * saveIndex)
                        {
                            LogsHelper.WriteEventLog("Intermediate saving of the certificate store...");

                            mainStore.WriteIfChanged();
                            StartRemovingSignatures(toDelete);
                            saveIndex++;
                        }
                    }
                }, token));
            });

            try
            {
                Task.WaitAll(tasks.ToArray());
            }
            catch (AggregateException ae)
            {
                foreach (Exception e in ae.InnerExceptions)
                {
                    if (e is not TaskCanceledException)
                        LogsHelper.WriteLog("SignatureImportService/ImportSigningCerts", e);
                }
            }
            mainStore.WriteIfChanged();
            StartRemovingSignatures(toDelete);
            ResultStr = (WasCancelled ? cancelMsg : endMsg) + $"Certificates found: {importStats.all}. Was imported: {importStats.imported}." + (importStats.parsingFailed > 0 ? $" Unrecognized files: {importStats.parsingFailed}" : "");
        }

        LogsHelper.WriteEventLog(ResultStr);
        return s_tokenSource == null ? new ImportSigningCertsResult(ResultStr) : new ImportSigningCertsResult(ResultStr, WasCancelled);
    }
    catch (Exception)
    {
        throw;
    }
    finally
    {
        IsWorking = false;
    }
}

Method UploadSigningCerts:

private static bool UploadSigningCerts(Store store, string path, ImportStats importStats)
{
    bool toBeDeleted = true;
    CryptoClient client = CryptoServiceContext.DefaultInstance.CryptoClient;

    try
    {
        List<CertInfo> certs = client.GetSignCmsInfo(File.ReadAllBytes(path)).Certs.ToList();

        Interlocked.Add(ref importStats.all, certs.Count);

        for (int i = 0; i < certs.Count; i++)
        {
            lock (s_importLockObj)
            {
                // Validating each certificate from a file, making an import decision, importing to the store...
            }
        }
        return toBeDeleted;
    }
    catch (Exception e)
    {
        LogsHelper.WriteLog("SignatureImportService/UploadSigningCerts", e);
        LogsHelper.WriteEventLog($"Error importing certificate from signature: {Path.GetFileName(path)};");
        Interlocked.Increment(ref importStats.errors);
        return false;
    }
}

Method StartRemovingSignatures:

private static void StartRemovingSignatures(List<string> toDelete)
{
    if (toDelete.Count > 0)
    {
        List<string> tempToDelete;
        lock (s_toDeleteListLockObj)
        {
            tempToDelete = new List<string>(toDelete);
            toDelete.Clear();
        }

        LogsHelper.WriteEventLog("Deleting successfully processed signature files...");

        Task.Factory.StartNew(() =>
        {
            tempToDelete.ForEach(path =>
            {
                try
                {
                    File.Delete(path);
                }
                catch (Exception e)
                {
                    LogsHelper.WriteLog("ImportResult/DeleteSignatures", e);
                }
            });
        });
    }
}

Error's text:

20.08.2023 11:58:01 api/Settings/ImportSignatures
Exception of type 'System.OutOfMemoryException' was thrown.
   at System.Threading.Tasks.Task.EnsureContingentPropertiesInitializedUnsafe()
   at System.Threading.Tasks.Task.AssignCancellationToken(CancellationToken cancellationToken, Task antecedent, TaskContinuation continuation)
   at System.Threading.Tasks.Task.TaskConstructorCore(Delegate action, Object state, CancellationToken cancellationToken, TaskCreationOptions creationOptions, InternalTaskOptions internalOptions, TaskScheduler scheduler)
   at Store.Services.SignatureImportService.<>c__DisplayClass20_0.<ImportSigningCerts>b__0(String path)
   at System.Collections.Generic.List`1.ForEach(Action`1 action)
   at Store.Services.SignatureImportService.ImportSigningCerts()
   at Store.Controllers.SettingsController.ImportSignatures()

Solution

  • I have fixed the error System.OutOfMemoryException and I want to share my actions. Key points:

    • I refactored the code, methods ImportSignatures (from controller), ImportSigningCerts and UploadSigningCerts are now asynchronous;
    • Instead of using Task with array accumulation, I use Parallel.ForEachAsync with MaxDegreeOfParallelism = Environment.ProcessorCount * 3 limitation.

    Now my code looks like this: Method ImportSignatures from controller:

    [HttpPost("ImportSignatures")]
    public async Task<JsonResult> ImportSignatures()
    {
        try
        {
            ImportSigningCertsResult res = await SignatureImportService.ImportSigningCerts();
            return Json(res);
        }
        catch (Exception e)
        {
            LogsHelper.WriteLog("api/Settings/ImportSignatures", e);
            return Json(new ImportSigningCertsResult(e.Message, SignatureImportService.WasCancelled));
        }
    }
    

    Method ImportSigningCerts:

    public static async Task<ImportSigningCertsResult> ImportSigningCerts()
    {
        LogsHelper.WriteEventLog("Launching SignatureImportService");
    
        WasCancelled = false;
        IsWorking = true;
        ResultStr = "";
        signaturesCount = 0;
        processedSignatures = 0;
    
        var cancelMsg = "Certificate import was interrupted. \n";
        var endMsg = "Certificate import completed successfully. \n";
    
        try
        {
            var configuration = SignatureImportConfiguration.FromCfg();
    
            using (s_tokenSource = new CancellationTokenSource())
            {
                string[] signatures = Directory.GetFiles(configuration.Path, "*.sig");
                signaturesCount = signatures.Length;
    
                Store mainStore = StoreMan.GetStore("Main");
                ImportStats importStats = new();
    
                const int partSize = 100000; // After what number of processed signatures to perform intermediate storage and deletion of signatures
    
                var options = new ParallelOptions()
                {
                    CancellationToken = s_tokenSource.Token,
                    MaxDegreeOfParallelism = Environment.ProcessorCount * 3
                };
    
                try
                {   // Dividing the total array into parts containing partSize elements in order to perform intermediate saving and deleting of processed files
                    foreach (string[] signsChunk in signatures.Chunk(partSize))
                    {
                        List<string> toDelete = new();
                        try
                        {
                            await Parallel.ForEachAsync(signsChunk, options, async (signPath, _) =>
                            {
                                bool canBeRemoved = await UploadSigningCerts(mainStore, signPath, importStats);
                                if (canBeRemoved && configuration.NeedCleaning)
                                {
                                    lock (s_toDeleteListLockObj)
                                        toDelete.Add(signPath);
                                }
                                Interlocked.Increment(ref processedSignatures);
                            });
                        }
                        finally
                        {
                            LogsHelper.WriteEventLog("Intermediate saving of the certificate store...");
    
                            mainStore.WriteIfChanged();
                            StartRemovingSignatures(toDelete);
                        }
                    }
                }
                catch (OperationCanceledException) { }
                catch (Exception e)
                {
                    LogsHelper.WriteLog("DeloWebSignatureImportService/ImportSigningCerts:Parallel.ForEachAsync", e);
                }
    
                ResultStr = (WasCancelled ? cancelMsg : endMsg) + $"Certificates found: {importStats.all}. Was imported: {importStats.imported}." + (importStats.parsingFailed > 0 ? $" Unrecognized files: {importStats.parsingFailed}" : "");
            }
    
            LogsHelper.WriteEventLog(ResultStr);
            return s_tokenSource == null ? new ImportSigningCertsResult(ResultStr) : new ImportSigningCertsResult(ResultStr, WasCancelled);
        }
        finally
        {
            IsWorking = false;
        }
    }
    

    Method UploadSigningCerts:

    private static async Task<bool> UploadSigningCerts(Store store, string path, ImportStats importStats)
    {
        CryptoClient client = CryptoServiceContext.DefaultInstance.CryptoClient;
    
        bool toBeDeleted = true;
        byte[] signatureData = await File.ReadAllBytesAsync(path);
        CertInfo[] certs = Array.Empty<CertInfo>();
    
        try
        {
            certs = client.GetSignCmsInfo(signatureData).Certs;
    
            Interlocked.Add(ref importStats.all, certs.Length);
    
            for (int i = 0; i < certs.Length; i++)
            {
                lock (s_importLockObj)
                {
                    // Validating each certificate from a signature, making an import decision, importing to the store...
                }
            }
            return toBeDeleted;
        }
        catch (Exception e)
        {
            LogsHelper.WriteLog("SignatureImportService/UploadSigningCerts", e);
            LogsHelper.WriteEventLog($"Error importing certificate from signature: {Path.GetFileName(path)};");
            Interlocked.Increment(ref importStats.errors);
            return false;
        }
    }
    

    Everything else remained unchanged. Thanks to everyone who wrote comments, you guided me on the right path))