I have 6 million small (average size about 15 bytes) files that I need to read and further process using the processor. I have previously implemented this using Task.Factory and it worked on asp.net core 2.1 without problems. It took about 20 hours.
I have now migrated the app to asp.net 6 and on the test server, my web application stops responding to any requests after starting these file operations. In the log I see the error System.OutOfMemoryException
.
I suppose my way of implementation is far from ideal. I would like to know what ways of multi-threaded implementation of this work can you suggest?
Method ImportSignatures
from controller:
[HttpPost("ImportSignatures")]
public JsonResult ImportSignatures()
{
try
{
return Json(SignatureImportService.ImportSigningCerts());
}
catch (Exception e)
{
LogsHelper.WriteLog("api/Settings/ImportSignatures", e);
return Json(new ImportSigningCertsResult(e.Message, SignatureImportService.WasCancelled));
}
}
Method ImportSigningCerts
:
public static ImportSigningCertsResult ImportSigningCerts()
{
LogsHelper.WriteEventLog("Launching SignatureImportService");
WasCancelled = false;
IsWorking = true;
ResultStr = "";
totalSignatures = 0;
processedSignatures = 0;
var cancelMsg = "Certificate import was interrupted. \n";
var endMsg = "Certificate import completed successfully. \n";
var toDelete = new List<string>();
try
{
var configuration = SignatureImportConfiguration.FromCfg();
using (s_tokenSource = new CancellationTokenSource())
{
List<string> signatures = Directory.EnumerateFiles(configuration.Path, "*.sig").ToList();
totalSignatures = signatures.Count;
Store mainStore = StoreMan.GetStore("Main");
var importStats = new ImportStats();
var tasks = new List<Task>();
int saveIndex = 1;
const int proccessedForSave = 100000; // After what number of processed signatures to perform intermediate storage and deletion of signatures
CancellationToken token = s_tokenSource.Token;
int minWorkerThreads, minCompletionPortThreads, maxWorkerThreads, maxCompletionPortThreads;
ThreadPool.GetMinThreads(out minWorkerThreads, out minCompletionPortThreads);
ThreadPool.GetMaxThreads(out maxWorkerThreads, out maxCompletionPortThreads);
ThreadPool.SetMaxThreads(minWorkerThreads * 2, maxCompletionPortThreads);
signatures.ForEach(path =>
{
tasks.Add(Task.Factory.StartNew(() =>
{
token.ThrowIfCancellationRequested();
// Here reading of a current file and uploading the necessary certificates to the store from it
if (UploadSigningCerts(mainStore, path, importStats))
{
if (configuration.NeedCleaning)
{
lock (s_toDeleteListLockObj)
toDelete.Add(path);
}
}
// Here intermediate store's saving and deleting proccessed files
lock (s_intermediateSaveLockObj)
{
if (++processedSignatures > proccessedForSave * saveIndex)
{
LogsHelper.WriteEventLog("Intermediate saving of the certificate store...");
mainStore.WriteIfChanged();
StartRemovingSignatures(toDelete);
saveIndex++;
}
}
}, token));
});
try
{
Task.WaitAll(tasks.ToArray());
}
catch (AggregateException ae)
{
foreach (Exception e in ae.InnerExceptions)
{
if (e is not TaskCanceledException)
LogsHelper.WriteLog("SignatureImportService/ImportSigningCerts", e);
}
}
mainStore.WriteIfChanged();
StartRemovingSignatures(toDelete);
ResultStr = (WasCancelled ? cancelMsg : endMsg) + $"Certificates found: {importStats.all}. Was imported: {importStats.imported}." + (importStats.parsingFailed > 0 ? $" Unrecognized files: {importStats.parsingFailed}" : "");
}
LogsHelper.WriteEventLog(ResultStr);
return s_tokenSource == null ? new ImportSigningCertsResult(ResultStr) : new ImportSigningCertsResult(ResultStr, WasCancelled);
}
catch (Exception)
{
throw;
}
finally
{
IsWorking = false;
}
}
Method UploadSigningCerts
:
private static bool UploadSigningCerts(Store store, string path, ImportStats importStats)
{
bool toBeDeleted = true;
CryptoClient client = CryptoServiceContext.DefaultInstance.CryptoClient;
try
{
List<CertInfo> certs = client.GetSignCmsInfo(File.ReadAllBytes(path)).Certs.ToList();
Interlocked.Add(ref importStats.all, certs.Count);
for (int i = 0; i < certs.Count; i++)
{
lock (s_importLockObj)
{
// Validating each certificate from a file, making an import decision, importing to the store...
}
}
return toBeDeleted;
}
catch (Exception e)
{
LogsHelper.WriteLog("SignatureImportService/UploadSigningCerts", e);
LogsHelper.WriteEventLog($"Error importing certificate from signature: {Path.GetFileName(path)};");
Interlocked.Increment(ref importStats.errors);
return false;
}
}
Method StartRemovingSignatures
:
private static void StartRemovingSignatures(List<string> toDelete)
{
if (toDelete.Count > 0)
{
List<string> tempToDelete;
lock (s_toDeleteListLockObj)
{
tempToDelete = new List<string>(toDelete);
toDelete.Clear();
}
LogsHelper.WriteEventLog("Deleting successfully processed signature files...");
Task.Factory.StartNew(() =>
{
tempToDelete.ForEach(path =>
{
try
{
File.Delete(path);
}
catch (Exception e)
{
LogsHelper.WriteLog("ImportResult/DeleteSignatures", e);
}
});
});
}
}
Error's text:
20.08.2023 11:58:01 api/Settings/ImportSignatures
Exception of type 'System.OutOfMemoryException' was thrown.
at System.Threading.Tasks.Task.EnsureContingentPropertiesInitializedUnsafe()
at System.Threading.Tasks.Task.AssignCancellationToken(CancellationToken cancellationToken, Task antecedent, TaskContinuation continuation)
at System.Threading.Tasks.Task.TaskConstructorCore(Delegate action, Object state, CancellationToken cancellationToken, TaskCreationOptions creationOptions, InternalTaskOptions internalOptions, TaskScheduler scheduler)
at Store.Services.SignatureImportService.<>c__DisplayClass20_0.<ImportSigningCerts>b__0(String path)
at System.Collections.Generic.List`1.ForEach(Action`1 action)
at Store.Services.SignatureImportService.ImportSigningCerts()
at Store.Controllers.SettingsController.ImportSignatures()
I have fixed the error System.OutOfMemoryException
and I want to share my actions. Key points:
ImportSignatures
(from controller), ImportSigningCerts
and UploadSigningCerts
are now asynchronous;Task
with array accumulation, I use Parallel.ForEachAsync
with MaxDegreeOfParallelism = Environment.ProcessorCount * 3
limitation.Now my code looks like this:
Method ImportSignatures
from controller:
[HttpPost("ImportSignatures")]
public async Task<JsonResult> ImportSignatures()
{
try
{
ImportSigningCertsResult res = await SignatureImportService.ImportSigningCerts();
return Json(res);
}
catch (Exception e)
{
LogsHelper.WriteLog("api/Settings/ImportSignatures", e);
return Json(new ImportSigningCertsResult(e.Message, SignatureImportService.WasCancelled));
}
}
Method ImportSigningCerts
:
public static async Task<ImportSigningCertsResult> ImportSigningCerts()
{
LogsHelper.WriteEventLog("Launching SignatureImportService");
WasCancelled = false;
IsWorking = true;
ResultStr = "";
signaturesCount = 0;
processedSignatures = 0;
var cancelMsg = "Certificate import was interrupted. \n";
var endMsg = "Certificate import completed successfully. \n";
try
{
var configuration = SignatureImportConfiguration.FromCfg();
using (s_tokenSource = new CancellationTokenSource())
{
string[] signatures = Directory.GetFiles(configuration.Path, "*.sig");
signaturesCount = signatures.Length;
Store mainStore = StoreMan.GetStore("Main");
ImportStats importStats = new();
const int partSize = 100000; // After what number of processed signatures to perform intermediate storage and deletion of signatures
var options = new ParallelOptions()
{
CancellationToken = s_tokenSource.Token,
MaxDegreeOfParallelism = Environment.ProcessorCount * 3
};
try
{ // Dividing the total array into parts containing partSize elements in order to perform intermediate saving and deleting of processed files
foreach (string[] signsChunk in signatures.Chunk(partSize))
{
List<string> toDelete = new();
try
{
await Parallel.ForEachAsync(signsChunk, options, async (signPath, _) =>
{
bool canBeRemoved = await UploadSigningCerts(mainStore, signPath, importStats);
if (canBeRemoved && configuration.NeedCleaning)
{
lock (s_toDeleteListLockObj)
toDelete.Add(signPath);
}
Interlocked.Increment(ref processedSignatures);
});
}
finally
{
LogsHelper.WriteEventLog("Intermediate saving of the certificate store...");
mainStore.WriteIfChanged();
StartRemovingSignatures(toDelete);
}
}
}
catch (OperationCanceledException) { }
catch (Exception e)
{
LogsHelper.WriteLog("DeloWebSignatureImportService/ImportSigningCerts:Parallel.ForEachAsync", e);
}
ResultStr = (WasCancelled ? cancelMsg : endMsg) + $"Certificates found: {importStats.all}. Was imported: {importStats.imported}." + (importStats.parsingFailed > 0 ? $" Unrecognized files: {importStats.parsingFailed}" : "");
}
LogsHelper.WriteEventLog(ResultStr);
return s_tokenSource == null ? new ImportSigningCertsResult(ResultStr) : new ImportSigningCertsResult(ResultStr, WasCancelled);
}
finally
{
IsWorking = false;
}
}
Method UploadSigningCerts
:
private static async Task<bool> UploadSigningCerts(Store store, string path, ImportStats importStats)
{
CryptoClient client = CryptoServiceContext.DefaultInstance.CryptoClient;
bool toBeDeleted = true;
byte[] signatureData = await File.ReadAllBytesAsync(path);
CertInfo[] certs = Array.Empty<CertInfo>();
try
{
certs = client.GetSignCmsInfo(signatureData).Certs;
Interlocked.Add(ref importStats.all, certs.Length);
for (int i = 0; i < certs.Length; i++)
{
lock (s_importLockObj)
{
// Validating each certificate from a signature, making an import decision, importing to the store...
}
}
return toBeDeleted;
}
catch (Exception e)
{
LogsHelper.WriteLog("SignatureImportService/UploadSigningCerts", e);
LogsHelper.WriteEventLog($"Error importing certificate from signature: {Path.GetFileName(path)};");
Interlocked.Increment(ref importStats.errors);
return false;
}
}
Everything else remained unchanged. Thanks to everyone who wrote comments, you guided me on the right path))