Original Problem - CSV file is too big (700k) records - so looking to create smaller CSV files from that big CSV file.
Got the following code to dissect the file and create smaller files.
private async Task SplitFile(List<CsvRow> rows, string name)
{
var numRows = 30000;
var remainder = rows.Count() % numRows;
var chunks = rows.Count() / numRows;
if (remainder > 0)
{
chunks++;
}
// Iterate rows in chunks
for (var row = 0; row < chunks; row++)
{
// Extract chunks using LINQ
var fileRows = rows
.Skip(row * numRows)
.Take(numRows)
.ToList();
var outputPath = Path.Combine(@"c:\", $"file{row}.txt");
var encoding = new UTF8Encoding(true);
await using var mem = new MemoryStream();
await using var fileWriter = new StreamWriter(outputPath, false, encoding);
await using var writer = new StreamWriter(mem, encoding);
await using var csvBlob = new CsvWriter(writer, CultureInfo.InvariantCulture);
await using var csvFile = new CsvWriter(fileWriter, CultureInfo.InvariantCulture);
await csvFile.WriteRecordsAsync(fileRows);
await csvBlob.WriteRecordsAsync(fileRows);
FileStream file = new FileStream(@$"c:\memfile{row}.txt", FileMode.Create,
FileAccess.Write);
mem.WriteTo(file);
file.Close();
}
}
Blocker - I am downloading the original big file from Azure Blob Container and after creating small chunks I am going to upload them back to the blob container. For that I need to have the data in MemoryStreams.
I am creating physical files just to work out the issue I am having memory streams. Easier to debug.
When I run the above code - small chunk files are created. You will notice I am creating two set of files (chunks)
Firstly, directly writing data into File Stream and secondly, using the MemoryStream I have created.
I get 30000 records in the file which is created by writing directly to File Stream but in the second file I only get 29889 records.
I tried everything but I am unable to get all 30000 records soon as I use MemoryStream.
I flushed the stream, fiddled with Encoding but nothing is helping. I read about UTF8 with BOM. that looked promising but again unable to work that out.
I am using Dot Net Core 3.1
Is there is a known issue with MemoryStream. why is it losing the last few records? Rest of the files are same.
Any ideas?
Thanks
As I commented above, the fix is to call Flush
on the CsvWriter
prior to copying the MemoryStream
. The issue is that there is a pending data still sitting in the CsvWriter
internal buffers that doesn't get copied to the MemoryStream
until you Flush
it. That should get things working for you.
However, I have deeper feedback for your scenario. It appears that you are reading the entire 700K file into a List<CsvRow>
before you process the batches. A better way, would be to stream the CSV data from Azure and as you are a reading it send the smaller batches back to Azure.
In this example I'm going to use my own library (Sylvan.Data.Csv), but I'm sure CsvHelper provides similar capabilities.
using Sylvan.Data.Csv;
...
string name = "MyMassiveCsv";
TextReader reader = File.OpenText(name + ".csv");
// replace above with however you access your Azure blob streams.
CsvDataReader csv = await CsvDataReader.CreateAsync(reader);
RangeDataReader r;
int i = 0;
do
{
r = new RangeDataReader(csv, 30000);
i++;
using var writer = File.CreateText(name + i + ".csv");
// using var writer = new StreamWriter(CreateAzureBlob("batch" + i));
using var w = CsvDataWriter.Create(writer);
await w.WriteAsync(r);
} while (!r.AtEndOfData);
This way you will only need to hold a small amount of the CSV file in memory at one time, and you will start sending back batches immediately instead of having to download the entire CSV first.
RangeDataReader
is a DbDataReader
implementation that wraps a DbDataReader
and limits the number of rows that it reads from the underlying reader. The implementation is the following:
using System;
using System.Collections;
using System.Data.Common;
using System.Threading;
using System.Threading.Tasks;
class RangeDataReader : DbDataReader
{
readonly DbDataReader reader;
int row = -1;
int count;
public RangeDataReader(DbDataReader dataReader, int count)
{
this.reader = dataReader;
this.count = count;
}
public bool AtEndOfData { get; private set; }
public override async Task<bool> ReadAsync(CancellationToken cancellationToken)
{
if (row < count)
{
row++;
var r = await reader.ReadAsync(cancellationToken);
if (!r)
{
AtEndOfData = r;
}
return r;
}
return false;
}
public override bool Read()
{
if (row < count)
{
row++;
var r = reader.Read();
if (!r)
{
AtEndOfData = r;
}
return r;
}
return false;
}
public override object this[int ordinal] => this.GetValue(ordinal);
public override object this[string name] => this.GetValue(GetOrdinal(name));
public override int Depth => 0;
public override int FieldCount => reader.FieldCount;
public override bool HasRows => reader.HasRows;
public override bool IsClosed => reader.IsClosed;
public override int RecordsAffected => reader.RecordsAffected;
public override bool GetBoolean(int ordinal)
{
return reader.GetBoolean(ordinal);
}
public override byte GetByte(int ordinal)
{
return reader.GetByte(ordinal);
}
public override long GetBytes(int ordinal, long dataOffset, byte[]? buffer, int bufferOffset, int length)
{
return reader.GetBytes(ordinal, dataOffset, buffer, bufferOffset, length);
}
public override char GetChar(int ordinal)
{
return reader.GetChar(ordinal);
}
public override long GetChars(int ordinal, long dataOffset, char[]? buffer, int bufferOffset, int length)
{
return reader.GetChars(ordinal, dataOffset, buffer, bufferOffset, length);
}
public override string GetDataTypeName(int ordinal)
{
return reader.GetDataTypeName(ordinal);
}
public override DateTime GetDateTime(int ordinal)
{
return reader.GetDateTime(ordinal);
}
public override decimal GetDecimal(int ordinal)
{
return reader.GetDecimal(ordinal);
}
public override double GetDouble(int ordinal)
{
return reader.GetDouble(ordinal);
}
public override IEnumerator GetEnumerator()
{
return new DbEnumerator(this);
}
public override Type GetFieldType(int ordinal)
{
return reader.GetFieldType(ordinal);
}
public override float GetFloat(int ordinal)
{
return reader.GetFloat(ordinal);
}
public override Guid GetGuid(int ordinal)
{
return reader.GetGuid(ordinal);
}
public override short GetInt16(int ordinal)
{
return reader.GetInt16(ordinal);
}
public override int GetInt32(int ordinal)
{
return reader.GetInt32(ordinal);
}
public override long GetInt64(int ordinal)
{
return reader.GetInt64(ordinal);
}
public override string GetName(int ordinal)
{
return reader.GetName(ordinal);
}
public override int GetOrdinal(string name)
{
return reader.GetOrdinal(name);
}
public override string GetString(int ordinal)
{
return reader.GetString(ordinal);
}
public override object GetValue(int ordinal)
{
return reader.GetValue(ordinal);
}
public override int GetValues(object[] values)
{
return reader.GetValues(values);
}
public override bool IsDBNull(int ordinal)
{
return reader.IsDBNull(ordinal);
}
public override bool NextResult()
{
throw new NotSupportedException();
}
}
Pretty much everything just delegates to the interna data reader. The only interesting bit is Read/ReadAsync
, which is where it limits the number of rows it will read. I haven't tested this code thoroughly, and looking at it now I might be off-by-one in the number of rows it will read.
Finally, now that I've illustrated how you can stream-process the CSV data, perhaps the need for splitting evaporates and you can simply stream process the file rather than needing to split it? Hard do know without knowing more about why you feel you need to split it.