I developed a CSV batch writer. But the process seems to be pretty slow compared to the BCP. The only requirement I have is to export big tables without identity or primary key columns into multiple small sized CSV files and name them with corresponding batch id.
The issue with BCP is that it will only write to a single big file.
What my current process does is: Reads the data and using CSV writer writes into memory stream I am continuously checking if the memory stream is greater than a particular batch size then I will copy the memory stream asynchronously write to a text file.
Without out of memory exceptions I am able to export a batch size of 250MB files
But this process takes 5 times more time compared to BCP export.
Is there any better way to achieve the batch exporting to CSV than what I am doing.
Please advise.
A couple options come to mind:
If the source query is able to be batched in SQL Server easily (e.g. a clustered index that you can key off of), FETCH and OFFSET are basically free.
If the table is a heap, FETCH/OFFSET are not really an option, but you might consider adding a clustered index since there are not too many good arguments against doing so (though doing so for a 100 GB table would be expensive :)
bcp "SELECT * FROM DemoTable ORDER BY ClusteredKey OFFSET 0 ROWS FETCH NEXT 20000 ROWS ONLY" queryout fetch1.csv -S Server -U sa -P Password -w
bcp "SELECT * FROM DemoTable ORDER BY ClusteredKey OFFSET 20000 ROWS FETCH NEXT 20000 ROWS ONLY" queryout fetch2.csv -S Server -U sa -P Password -w
bcp "SELECT * FROM DemoTable ORDER BY ClusteredKey OFFSET 40000 ROWS FETCH NEXT 20000 ROWS ONLY" queryout fetch3.csv -S Server -U sa -P Password -w
bcp "SELECT * FROM DemoTable ORDER BY ClusteredKey OFFSET 60000 ROWS FETCH NEXT 20000 ROWS ONLY" queryout fetch4.csv -S Server -U sa -P Password -w
From measurements using a table of ~1.2 GB, a naïve implementation of a C# CSV SQL Export (below) achieves 75% of the performance of BCP on the same table and system. (It also has the benefit of handling the CSV format correctly regarding embedded commas, quotes, and CRLF's).
static void Main(string[] args)
{
var con = new SqlConnection(@"Server=(local);Database=Demo;User Id=sa;Password=bar;");
con.Open();
var sqr = new SqlCommand("SELECT * FROM dbo.Table", con);
using (var reader = sqr.ExecuteReader())
using (var tw = File.CreateText("out.csv"))
{
while (reader.Read())
{
for (int i = 0; i < reader.FieldCount; i++)
{
if (i != 0)
{
tw.Write(',');
}
var val = FormatValue(reader[i]);
if (val == null)
{
// no-op
}
else if (val.IndexOfAny(new[] { '"', ',', '\r', '\n' }) >= 0)
{
tw.Write('"');
tw.Write(val.Replace("\"", "\"\""));
tw.Write('"');
}
else
{
tw.Write(val);
}
}
tw.Write("\r\n");
}
}
}
private static string FormatValue(object v)
{
if (v == null)
{
return null;
}
if (v is DateTime dt)
{
return dt.ToString("O");
}
if (v is DateTimeOffset dto)
{
return dto.ToString("O");
}
if (v is byte[] ba)
{
var sb = new StringBuilder(2 + ba.Length * 2);
sb.Append("0x");
for (int i = 0; i < ba.Length; i++)
{
sb.Append(ba[i].ToString("X2"));
}
return sb.ToString();
}
return v.ToString();
}
Performance seems to be limited by the GC handling so many string allocations - so if higher performance is required, the same translated to a non-CLR language (e.g. C++) would probably match the performance of BCP.
SSIS can perform all the steps in one package. The exact steps are probably best left to a different question, but basically amount to synthesizing a column for "File number" and using the Flat File destination. Bad example of this
If you use SSIS (either directly or by using the Export Data Wizard), you will get a RFC 4180 compliant CSV File that you can split. An example tool to split such a file would be:
class Program
{
static void Main(string[] args)
{
int n = 0;
using (var src = File.OpenRead("rfc4180_in.csv"))
using (var dst = new CsvRfc4180SplittingWriteStream(() => File.Create($"rfc4180_out{n++}.csv"), 100 /* mb per chunk */ * 1024 * 1024))
{
src.CopyTo(dst);
}
}
}
/// <summary>
/// Abstract class which uses ParseDataGetCutPoint to split the files into streams at least
/// cutAfterPosition bytes long.
/// </summary>
abstract class SplittingWriteStream : Stream
{
private long _TotalPosition;
private long CurrentStreamPos;
private readonly long CutAfterPosition;
private readonly Func<Stream> StreamCtor;
private Stream CurrentStream;
public SplittingWriteStream(Func<Stream> createStream, long cutAfterPosition)
{
if (cutAfterPosition < 0L)
{
throw new ArgumentOutOfRangeException(nameof(cutAfterPosition));
}
this.CutAfterPosition = cutAfterPosition;
this.StreamCtor = createStream ?? throw new ArgumentNullException(nameof(createStream));
this.CurrentStream = createStream();
}
protected override void Dispose(bool disposing) => CurrentStream.Dispose();
public override void Flush() => CurrentStream.Flush();
public override void Write(byte[] buffer, int offset, int count)
{
// ignore count to always exceed cutAfterPosition
var cutPoint = ParseDataGetCutPoint(buffer, offset, count, getCutPoint: CurrentStreamPos > CutAfterPosition);
if (cutPoint < 0)
{
CurrentStream.Write(buffer, offset, count);
}
else
{
if (cutPoint > 0)
{
CurrentStream.Write(buffer, offset, cutPoint);
}
try
{
CurrentStream.Dispose();
}
finally
{
CurrentStream = null;
CurrentStreamPos = 0L;
CurrentStream = StreamCtor();
}
if (cutPoint != count)
{
CurrentStream.Write(buffer, offset + cutPoint, count - cutPoint);
}
}
CurrentStreamPos += count;
_TotalPosition += count;
}
protected abstract int ParseDataGetCutPoint(byte[] buffer, int offset, int count, bool getCutPoint);
#region Stream Write-only stubs
public override bool CanRead => false;
public override bool CanSeek => false;
public override bool CanWrite => true;
public override long Length => throw new NotSupportedException();
public override int Read(byte[] buffer, int offset, int count) => throw new NotSupportedException();
public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();
public override void SetLength(long value) => throw new NotSupportedException();
public override long Position
{
get => _TotalPosition;
set => throw new NotSupportedException();
}
#endregion
}
class CsvRfc4180SplittingWriteStream : SplittingWriteStream
{
public CsvRfc4180SplittingWriteStream(Func<Stream> createStream, long cutAfterPosition)
: base(createStream, cutAfterPosition)
{
}
bool inQuotedString;
bool lastWasQuote;
protected override int ParseDataGetCutPoint(byte[] buffer, int offset, int count, bool getCutPoint)
{
int? cutPoint = null;
for (int n = 0; n < count; n++)
{
var i = n + offset;
StepState(buffer[i]);
// check for CRLF if desired and not escaped
if (getCutPoint && !inQuotedString && cutPoint == null
&& buffer[i] == '\r' && n + 1 < count && buffer[i + 1] == '\n')
{
cutPoint = n;
}
}
return cutPoint ?? -1;
}
private void StepState(byte v)
{
var isQuote = v == '"';
if (lastWasQuote)
{
lastWasQuote = false;
if (isQuote)
{
// Double quotes:
// nop
// Inside quoted string == literal escape
// Outside quoted string == empty string
}
else
{
// quote with non-quote following == toggle quoted string
inQuotedString ^= true;
}
}
else
{
lastWasQuote = isQuote;
}
}
}
If BCP is desired, and its (bad) handling of CSV is tolerable, it can write to a named pipe stream to split on the fly.
class Program
{
static void Main(string[] args)
{
Thread copyThread;
var pipeId = $"bcp_{Guid.NewGuid():n}";
// bcp requires read/write pipe
using (var np = new NamedPipeServerStream(pipeId))
{
copyThread = new Thread(_1 =>
{
np.WaitForConnection();
int n = 0;
// Use CrlfUtf16leSplittingWriteStream with -w (UTF 16 Little Endian)
// Use CrlfUtf8SplittingWriteStream other (UTF 8 / ANSII / ASCII / OEM)
using (var dst = new CrlfUtf16leSplittingWriteStream(() => File.Create($"rfc4180_out{n++}.csv"), 100 /* mb per chunk */ * 1024 * 1024))
{
np.CopyTo(dst);
}
});
copyThread.Name = "Write thread";
copyThread.IsBackground = true;
copyThread.Start();
var bcp = Process.Start(
@"C:\Program Files\Microsoft SQL Server\Client SDK\ODBC\170\Tools\Binn\bcp.exe",
$@"FWDB.Rx.RxBatches out \\.\pipe\{pipeId} -S (local) -U sa -P abc -w -t,");
bcp.WaitForExit();
}
copyThread.Join();
}
}
class CrlfUtf16leSplittingWriteStream : SplittingWriteStream
{
public CrlfUtf16leSplittingWriteStream(Func<Stream> createStream, long cutAfterPosition)
: base(createStream, cutAfterPosition)
{
}
protected override int ParseDataGetCutPoint(byte[] buffer, int offset, int count, bool getCutPoint)
{
if (getCutPoint)
{
for (int n = 0; n < count - 3 /* CR 00 LF 00 */; n++)
{
var i = n + offset;
if (buffer[i] == '\r' && buffer[i + 1] == 0
&& buffer[i + 2] == '\n' && buffer[i + 3] == 0)
{
// split after CRLF
return n + 4;
}
}
}
return -1;
}
}
class CrlfUtf8SplittingWriteStream : SplittingWriteStream
{
public CrlfUtf8SplittingWriteStream(Func<Stream> createStream, long cutAfterPosition)
: base(createStream, cutAfterPosition)
{
}
protected override int ParseDataGetCutPoint(byte[] buffer, int offset, int count, bool getCutPoint)
{
if (getCutPoint)
{
for (int n = 0; n < count - 1 /* CR LF */; n++)
{
var i = n + offset;
if (buffer[i] == '\r' && buffer[i + 1] == '\n')
{
// split after CRLF
return n + 2;
}
}
}
return -1;
}
}
/// <summary>
/// Abstract class which uses ParseDataGetCutPoint to split the files into streams at least
/// cutAfterPosition bytes long.
/// </summary>
abstract class SplittingWriteStream : Stream
{
private long _TotalPosition;
private long CurrentStreamPos;
private readonly long CutAfterPosition;
private readonly Func<Stream> StreamCtor;
private Stream CurrentStream;
public SplittingWriteStream(Func<Stream> createStream, long cutAfterPosition)
{
if (cutAfterPosition < 0L)
{
throw new ArgumentOutOfRangeException(nameof(cutAfterPosition));
}
this.CutAfterPosition = cutAfterPosition;
this.StreamCtor = createStream ?? throw new ArgumentNullException(nameof(createStream));
this.CurrentStream = createStream();
}
protected override void Dispose(bool disposing) => CurrentStream.Dispose();
public override void Flush() => CurrentStream.Flush();
public override void Write(byte[] buffer, int offset, int count)
{
// ignore count to always exceed cutAfterPosition
var cutPoint = ParseDataGetCutPoint(buffer, offset, count, getCutPoint: CurrentStreamPos > CutAfterPosition);
if (cutPoint < 0)
{
CurrentStream.Write(buffer, offset, count);
}
else
{
if (cutPoint > 0)
{
CurrentStream.Write(buffer, offset, cutPoint);
}
try
{
CurrentStream.Dispose();
}
finally
{
CurrentStream = null;
CurrentStreamPos = 0L;
CurrentStream = StreamCtor();
}
if (cutPoint != count)
{
CurrentStream.Write(buffer, offset + cutPoint, count - cutPoint);
}
}
CurrentStreamPos += count;
_TotalPosition += count;
}
protected abstract int ParseDataGetCutPoint(byte[] buffer, int offset, int count, bool getCutPoint);
#region Stream Write-only stubs
public override bool CanRead => false;
public override bool CanSeek => false;
public override bool CanWrite => true;
public override long Length => throw new NotSupportedException();
public override int Read(byte[] buffer, int offset, int count) => throw new NotSupportedException();
public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();
public override void SetLength(long value) => throw new NotSupportedException();
public override long Position
{
get => _TotalPosition;
set => throw new NotSupportedException();
}
#endregion
}
class CsvRfc4180SplittingWriteStream : SplittingWriteStream
{
public CsvRfc4180SplittingWriteStream(Func<Stream> createStream, long cutAfterPosition)
: base(createStream, cutAfterPosition)
{
}
bool inQuotedString;
bool lastWasQuote;
protected override int ParseDataGetCutPoint(byte[] buffer, int offset, int count, bool getCutPoint)
{
int? cutPoint = null;
for (int n = 0; n < count; n++)
{
var i = n + offset;
StepState(buffer[i]);
// check for CRLF if desired and not escaped
if (getCutPoint && !inQuotedString && cutPoint == null
&& buffer[i] == '\r' && n + 1 < count && buffer[i + 1] == '\n')
{
cutPoint = n;
}
}
return cutPoint ?? -1;
}
private void StepState(byte v)
{
var isQuote = v == '"';
if (lastWasQuote)
{
lastWasQuote = false;
if (isQuote)
{
// Double quotes:
// nop
// Inside quoted string == literal escape
// Outside quoted string == empty string
}
else
{
// quote with non-quote following == toggle quoted string
inQuotedString ^= true;
}
}
else
{
lastWasQuote = isQuote;
}
}
}