Search code examples
c#sqldatareadersqlbulkcopyidatareader

How to implement the interface IDataReader in order to process the data before being insert?


I have a stored procedure which gives me a result set consisting of a single column which contains millions of unprocessed rows. I need to transfer these data to another server using SqlBulkCopy, but the problem is that I can't simply do the following:

using (var con = new SqlConnection(sqlConnectionStringSource))
{
    using (var cmd = new SqlCommand("usp_GetUnprocessedData", con))
    {
        cmd.CommandType = CommandType.StoredProcedure;
        con.Open();
        using (var reader = cmd.ExecuteReader())
        {
            using (var sqlBulk = new SqlBulkCopy(sqlConnectionStringDestination))
            {
                sqlBulk.DestinationTableName = "BulkCopy";
                sqlBulk.BulkCopyTimeout = 0;
                sqlBulk.BatchSize = 200000;
                sqlBulk.WriteToServer(reader);
            }
        }
    }
}

because data won't be processed at all.

In my case, the nth row of the result set looks like this:

value1_n,value2_n,value3_n

where n is just a subscript I've introduced to distinguish between the various rows.

In the destination table, which I've named BulkCopy, I'd like to have:

╔══════════╦══════════╦══════════╗
║  Field1  ║  Field2  ║  Field3  ║
╠══════════╬══════════╬══════════╣
║ Value1_1 ║ Value2_1 ║ Value3_1 ║
║ Value1_2 ║ Value2_2 ║ Value3_2 ║
║ ...      ║ ...      ║ ...      ║
║ Value1_n ║ Value2_n ║ Value3_n ║
╚══════════╩══════════╩══════════╝

I was being told to use a custom DataReader via an implementation of the IDataReader interface, in order to process data row by row before SqlBulkCopy copies the data from it, using EnableStreamingProperty = true to ensure that only a small amount of data are in memory, but I have no idea where to start. Can you help me, please?


Solution

  • Let's reverse the problem. Instead of finding a generic solution, create one specific for this problem. Having spent days creating an IDataReader wrapper I know it's not that trivial.

    We know how many fields there are, we don't care about any other fields in the results. Instead of trying to correctly implement an IDataReader wrapper, we could create an iterator method to split the data and return the records one by one in a streaming fashion. FastMember's ObjectReader can wrap an IDataReader interface over any IEnumerable :

    class MyDTO
    {
        public string Field1{get;set;}
        public string Field2{get;set;}
        public string Field3{get;set;}
    }
    
    public IEnumerable<MyDTO> ReaderToStream(IDataReader reader)
    {
        while(reader.Read())
        {
            var line=reader.GetString(0);
            var fields=String.Split(",",line);
            yield return new MyDTO{Field1=fields[0];Field2=fields[1];Field3=fields[2]};
        }
    }
    

    The importing method could change to :

    using (var con = new SqlConnection(sqlConnectionStringSource))
    {
        ...
        using (var reader = cmd.ExecuteReader())
        {
            var recordStream=ReaderToStream(reader);
            using(var rd=ObjectReader(recordStream))
            using (var sqlBulk = new SqlBulkCopy(sqlConnectionStringDestination))
            {
                ...
                sqlBulk.WriteToServer(rd);
            }
        }
    }
    

    The iterator calls Read() only when SqlBulkCopy requests a new record, so we don't end up loading everything in memory.

    And the IDataReader wrapper

    Resharper and Visual Studio 2019 offer to implement an interface by delegating calls to a wrapped class. In Visual Studio 2019 this is called Implement interface through 'field_name'.

    Starting with this code :

    class ReaderWrapper:IDataReader
    {
        private readonly IDataReader _inner ;
        public ReaderWrapper(IDataReader inner)
        {
            _inner = inner;
        }
    }
    

    Applying the refactoring gives :

    class ReaderWrapper:IDataReader
    {
        private readonly IDataReader _inner ;
        public ReaderWrapper(IDataReader inner)
        {
            _inner = inner;
        }
    
        public object this[int i] => _inner[i];
    
        public object this[string name] => _inner[name];
    
        public int Depth => _inner.Depth;
    
        public bool IsClosed => _inner.IsClosed;
    
        public int RecordsAffected => _inner.RecordsAffected;
    
        public int FieldCount => _inner.FieldCount;
    
        public void Close() => _inner.Close();
        public void Dispose() => _inner.Dispose();
        public bool GetBoolean(int i) => _inner.GetBoolean(i);
        public byte GetByte(int i) => _inner.GetByte(i);
        public long GetBytes(int i, long fieldOffset, byte[] buffer, int bufferoffset, int length) => _inner.GetBytes(i, fieldOffset, buffer, bufferoffset, length);
        public char GetChar(int i) => _inner.GetChar(i);
        public long GetChars(int i, long fieldoffset, char[] buffer, int bufferoffset, int length) => _inner.GetChars(i, fieldoffset, buffer, bufferoffset, length);
        public IDataReader GetData(int i) => _inner.GetData(i);
        public string GetDataTypeName(int i) => _inner.GetDataTypeName(i);
        public DateTime GetDateTime(int i) => _inner.GetDateTime(i);
        public decimal GetDecimal(int i) => _inner.GetDecimal(i);
        public double GetDouble(int i) => _inner.GetDouble(i);
        public Type GetFieldType(int i) => _inner.GetFieldType(i);
        public float GetFloat(int i) => _inner.GetFloat(i);
        public Guid GetGuid(int i) => _inner.GetGuid(i);
        public short GetInt16(int i) => _inner.GetInt16(i);
        public int GetInt32(int i) => _inner.GetInt32(i);
        public long GetInt64(int i) => _inner.GetInt64(i);
        public string GetName(int i) => _inner.GetName(i);
        public int GetOrdinal(string name) => _inner.GetOrdinal(name);
        public DataTable GetSchemaTable() => _inner.GetSchemaTable();
        public string GetString(int i) => _inner.GetString(i);
        public object GetValue(int i) => _inner.GetValue(i);
        public int GetValues(object[] values) => _inner.GetValues(values);
        public bool IsDBNull(int i) => _inner.IsDBNull(i);
        public bool NextResult() => _inner.NextResult();
        public bool Read() => _inner.Read();
    }
    

    To create a splitting wrapper, we need to replace Read() with our own version :

        private string[] _values;
    
        public bool Read()
        {
            var ok = _inner.Read();
            if (ok)
            {
                //It *could be null*
                if (_inner.IsDBNull(0))
                {
                    //What to do? Store an empty array for now
                    _values = new string[0];
                }
                var fieldValue = _inner.GetString(0);                
                _values= fieldValue.Split(',');
            }
            return ok;
        }
    

    This splits the CSV values and stores them in a string. This shows why implementing the wrapper as a bit of a bother - we need to handle quite a few things and decide what to do in unexpected situations like nulls, empty strings etc.

    After that, we need to add our own implementations for the methods called by SqlBulkCopy. GetValue() is definitelly called, so is FieldCount. Other members are called based on the column mapping types, by name or by ordinal.

    public int FieldCount => _values.Length;
    
    public string GetString(int ordinal) => _values[ordinal];
    
    public object GetValue(int ordinal)=> _values[ordinal];
    
    //What if we have more values than expected?
    public int GetValues(object[] values)
    {
        if (_values.Length > 0)
        {
            Array.Copy(_values, values,_values.Length);
            return _values.Length;
        }
        return 0;
    }
    

    And now the "funny" parts. What about GetName()? Probably :

    public string GetName(int ordinal) => $"Field{ordinal}";
    

    GetOrdinal ? It may be called in name mapping. Getting tricky :

    public int GetOrdinal(string name) => int.Parse(name.Substring(5));
    

    Let's hope this works.

    We also need to override the indexes :

        public object this[string name] => _values[GetOrdinal(name)];
    
        public object this[int i] => _values[i];
    

    What did I forget? ... Still need to handle arbitrary value numbers. Need to handle nulls. There's no GetSchemaTable which probably means the column mappings will have to be specified explicitly, probably by ordinal.

    A quick&dirt IsDbNull implementation could be :

    public bool IsDBNull(int i)
    {  
        //Covers the "null" case too, when `Length` is 0
        if (i>_values.Length-1)
        {
            return true;
        }
        return _inner.IsDBNull(i);
    }
    

    GetSchemaTable is harder because we don't really know how many values are in each record. The table has 20+ columns so I'd rather not write that code until I see that it's needed.

    public DataTable GetSchemaTable() => throw new NotImplementedException();
    

    Leave it as an excercise to the reader as they say

    PPS: Default interface implementations, because why not

    All this is probably a nice if convoluted case where C# 8's default interface methods could be used to create a wrapped reader trait. By default, defer to the wrapped inner reader. This would eliminate all deferred calles in the implementation.

    interface IReaderWrapper:IDataReader
    {
        //Gives access to the wrapped reader in the concrete classes
        abstract IDataReader Inner();
    
        override object this[int i] => Inner()[i];
    
        override object this[string name] => Inner()[name];
    
        override int Depth => Inner().Depth;
    
        override bool IsClosed => Inner().IsClosed;
        ...
    }
    
    class SplitterWrapper:IReaderWrapper
    {
    
        private readonly IDataReader _inner ;
        public SplitterWrapper(IDataReader inner)
        {
            _inner = inner;
        }
    
        IDataReader Inner()=> _inner;
    
        string[] _values;
        public object this[int i] => _values[i];
        ...
    }
    

    This feature doesn't work in the C# 8 compiler that came with VS 2019 and somehow crashes Sharplab.io. No idea if it will compile or if the overrides are really needed.