Search code examples
c#sqldatatablesqldatareader

DataTable "Array dimensions exceeded supported range" when requesting dataTable.NewRow()


For some insane reason, I'm getting an OutOfMemoryException while steaming data to SQL in sensible chunks, and barely using any memory at all:

System.OutOfMemoryException: Exception of type 'System.OutOfMemoryException' was thrown.
   at System.Data.DataTable.NewRowArray(Int32 size)
   at System.Data.RecordManager.GrowRecordCapacity()
   at System.Data.RecordManager.NewRecordBase()
   at System.Data.DataTable.NewRecord(Int32 sourceRecord)
   at Company.PA.Data.PADbContext.d__22`1.MoveNext() in D:\Agent_A\_work\7\s\Company.PA.DataLayer\Company.PA.Data\BulkInsert\StreamedSqlBulkCopy.cs:line 46

The error occurs when calling dataTable.NewRow() inside the while loop below, once I get past about the 30 millionth row:

/// <summary>Helper to stream a large number of records into SQL without
/// ever having to materialize the entire enumerable into memory at once.</summary>
/// <param name="destinationTableName">The name of the table in the database to copy data to.</param>
/// <param name="dataTable">A new instance of the DataTable class that matches the schema of the table to insert to.
/// This should match exactly (same column names) what is in SQL, for automatic column mapping to work.</param>
/// <param name="sourceData">The enumerable of data that will be used to generate DataRows</param>
/// <param name="populateRow">A delegate function that populates and returns a new data row for a given record.</param>
/// <param name="memoryBatchSize">The number of DataRows to generate in memory before passing them to SqlBulkCopy</param>
/// <param name="insertBatchSize">The batch size of inserts performed by SqlBulkCopy utility.</param>
public async Task StreamedSqlBulkCopy<T>(
    string destinationTableName, DataTable dataTable,
    IEnumerable<T> sourceData, Func<T, DataRow, DataRow> populateRow,
    int memoryBatchSize = 1000000, int insertBatchSize = 5000)
{
    using (SqlConnection connection = new SqlConnection(Database.Connection.ConnectionString))
    {
        connection.Open();
        using (SqlBulkCopy bulkCopy = new SqlBulkCopy(connection, SqlBulkCopyOptions.TableLock, null))
        using (IEnumerator<T> enumerator = sourceData.GetEnumerator())
        {
            // Configure the single SqlBulkCopy instance that will be used to copy all "batches"
            bulkCopy.DestinationTableName = destinationTableName;
            bulkCopy.BatchSize = insertBatchSize;
            bulkCopy.BulkCopyTimeout = _bulkInsertTimeOut;
            foreach (DataColumn column in dataTable.Columns)
                bulkCopy.ColumnMappings.Add(column.ColumnName, column.ColumnName);
            // Begin enumerating over all records, preparing batches no larger than "memoryBatchSize"
            bool hasNext = true;
            while (hasNext)
            {
                DataRow[] batch = new DataRow[memoryBatchSize];
                int filled = 0;
                while ((hasNext = enumerator.MoveNext()) && filled < memoryBatchSize)
                    batch[filled++] = populateRow(enumerator.Current, dataTable.NewRow());
                // When we reach the end of the enumerable, we need to shrink the final buffer array
                if (filled < memoryBatchSize)
                    Array.Resize(ref batch, filled);
                await bulkCopy.WriteToServerAsync(batch);
            }
        }
    }
}

As is hopefully clear, the purpose of the above helper is to stream a (very large) IEnumerable<T> of data to an SQL table using the SqlBulkCopy reader and a delegate that will fill a row for a given element.

Sample usage is:

public async Task SaveExchangeRates(List<FxRate> fxRates)
{
    var createDate = DateTimeOffset.UtcNow;
    await StreamedSqlBulkCopy("RefData.ExchangeRate",
        GetExchangeRateDataTable(), fxRates, (fx, newRow) =>
        {
            newRow["BaseCurrency"] = "USD";
            newRow["TargetCurrency"] = fx.CurrencyCode;
            newRow["ExchangeDate"] = fx.ExchangeRateDate;
            newRow["DollarValue"] = fx.ValueInUsd;
            return newRow;
        });
}

private DataTable GetExchangeRateDataTable()
{
    var dataTable = new DataTable();
    dataTable.Columns.Add("ExchangeDate", typeof(DateTime));
    dataTable.Columns.Add("BaseCurrency", typeof(string));
    dataTable.Columns.Add("TargetCurrency", typeof(string));
    dataTable.Columns.Add("DollarValue", typeof(double));
    return dataTable;
}

Solution

  • Another approach, simplified (but at the cost of additional overhead) is to accept our fate and use the DataTable class rather than an array of DataRow - but create Clone() copies of the original table periodically to avoid the apparent hard-max limit of 16,777,216 rows.

    I didn't appreciate that DataTable maintains an array for all the rows you create with it, even if they don't end up getting added - so we might as well take advantage rather than allocate our own.

    Some of the overhead with using a DataTable can be offset by setting its initial capacity to ensure it doesn't grow (memory allocation) and disabling as many events as possible:

    Relevant change below:

    bool hasNext = true;
    while (hasNext)
    {
        using (DataTable tableChunk = dataTable.Clone())
        {
            tableChunk.MinimumCapacity = memoryBatchSize + 1; // Avoid triggering resizing
            tableChunk.BeginLoadData(); // Speeds up inserting a large volume of rows a little
            int filled = 0;
            while ((hasNext = enumerator.MoveNext()) && filled++ < memoryBatchSize)
                tableChunk.Rows.Add(populateRow(enumerator.Current, tableChunk.NewRow()));
            await bulkCopy.WriteToServerAsync(tableChunk);
        }
    }