Search code examples
c#datareader

How to add columns to DataReader


My goal is to retrieve data from a data source, add some metadata to it and insert it to another target.

The target has schema with four more columns then the source (calculated columns).

I am using SqlBulkCopy, which requires a reader with all columns (including the 4 calculated).

Is there a way to add columns to the DataReader manually? or if it's not possible what alternatives i have for the data insertion?


Solution

  • I needed to do this, and also have the ability to make columns based on other columns and have the column's value depend on the row index of the reader. This class implements IWrappedReader from Dapper if thats useful to you (it was for me). The class isn't complete as I didn't implement all the IDataRecord fields but you can look at how I did IDataRecord.GetInt32 to see the simple pattern.

    /// <inheritdoc />
    public class WrappedDataReader : IDataReader
    {
        private readonly IList<AdditionalField> _additionalFields;
        private readonly int _originalOrdinalCount;
        private IDbCommand _cmd;
        private int _currentRowIndex = -1; //The first Read() will make this 0
        private IDataReader _underlyingReader;
    
        public WrappedDataReader(IDataReader underlyingReader, IList<AdditionalField> additionalFields)
        {
            _additionalFields = additionalFields;
            _underlyingReader = underlyingReader;
    
            var schema = Reader.GetSchemaTable();
            if (schema == null)
            {
                throw new ObjectDisposedException(GetType().Name);
            }
    
            _originalOrdinalCount = schema.Rows.Count;
        }
    
        public object this[int i]
        {
            get { throw new NotImplementedException(); }
        }
    
        public IDataReader Reader
        {
            get
            {
                if (_underlyingReader == null)
                {
                    throw new ObjectDisposedException(GetType().Name);
                }
    
                return _underlyingReader;
            }
        }
    
        IDbCommand IWrappedDataReader.Command
        {
            get
            {
                if (_cmd == null)
                {
                    throw new ObjectDisposedException(GetType().Name);
                }
    
                return _cmd;
            }
        }
    
        void IDataReader.Close() => _underlyingReader?.Close();
    
        int IDataReader.Depth => Reader.Depth;
    
        DataTable IDataReader.GetSchemaTable()
        {
            var rv = Reader.GetSchemaTable();
            if (rv == null)
            {
                throw new ObjectDisposedException(GetType().Name);
            }
    
            for (var i = 0; i < _additionalFields.Count; i++)
            {
                var row = rv.NewRow();
    
                row["ColumnName"] = _additionalFields[i].ColumnName;
                row["ColumnOrdinal"] = GetAppendColumnOrdinal(i);
                row["DataType"] = _additionalFields[i].DataType;
    
                rv.Rows.Add(row);
            }
    
            return rv;
        }
    
        bool IDataReader.IsClosed => _underlyingReader?.IsClosed ?? true;
    
        bool IDataReader.NextResult() => Reader.NextResult();
    
        bool IDataReader.Read()
        {
            _currentRowIndex++;
            return Reader.Read();
        }
    
        int IDataReader.RecordsAffected => Reader.RecordsAffected;
    
        void IDisposable.Dispose()
        {
            _underlyingReader?.Close();
            _underlyingReader?.Dispose();
            _underlyingReader = null;
            _cmd?.Dispose();
            _cmd = null;
        }
    
        int IDataRecord.FieldCount => Reader.FieldCount + _additionalFields.Count;
    
        bool IDataRecord.GetBoolean(int i) => Reader.GetBoolean(i);
    
        byte IDataRecord.GetByte(int i) => Reader.GetByte(i);
    
        long IDataRecord.GetBytes(int i, long fieldOffset, byte[] buffer, int bufferoffset, int length) =>
            Reader.GetBytes(i, fieldOffset, buffer, bufferoffset, length);
    
        char IDataRecord.GetChar(int i) => Reader.GetChar(i);
    
        long IDataRecord.GetChars(int i, long fieldoffset, char[] buffer, int bufferoffset, int length) =>
            Reader.GetChars(i, fieldoffset, buffer, bufferoffset, length);
    
        IDataReader IDataRecord.GetData(int i) => Reader.GetData(i);
    
        string IDataRecord.GetDataTypeName(int i) => Reader.GetDataTypeName(i);
    
        DateTime IDataRecord.GetDateTime(int i) => Reader.GetDateTime(i);
    
        decimal IDataRecord.GetDecimal(int i) => Reader.GetDecimal(i);
    
        double IDataRecord.GetDouble(int i) => Reader.GetDouble(i);
    
        Type IDataRecord.GetFieldType(int i) => Reader.GetFieldType(i);
    
        float IDataRecord.GetFloat(int i) => Reader.GetFloat(i);
    
        Guid IDataRecord.GetGuid(int i) => Reader.GetGuid(i);
    
        short IDataRecord.GetInt16(int i) => Reader.GetInt16(i);
    
        int IDataRecord.GetInt32(int i)
        {
            return i >= _originalOrdinalCount ? (int) ExecuteAdditionalFieldFunc(i) : Reader.GetInt32(i);
        }
    
        long IDataRecord.GetInt64(int i) => Reader.GetInt64(i);
    
        string IDataRecord.GetName(int i)
        {
            return i >= _originalOrdinalCount ? _additionalFields[GetAppendColumnIndex(i)].ColumnName : Reader.GetName(i);
        }
    
        int IDataRecord.GetOrdinal(string name)
        {
            for (var i = 0; i < _additionalFields.Count; i++)
            {
                if (name.Equals(_additionalFields[i].ColumnName, StringComparison.OrdinalIgnoreCase))
                {
                    return GetAppendColumnOrdinal(i);
                }
            }
    
            return Reader.GetOrdinal(name);
        }
    
        string IDataRecord.GetString(int i) => Reader.GetString(i);
    
        object IDataRecord.GetValue(int i)
        {
            return i >= _originalOrdinalCount ? ExecuteAdditionalFieldFunc(i) : Reader.GetValue(i);
        }
    
        int IDataRecord.GetValues(object[] values) => Reader.GetValues(values);
    
        bool IDataRecord.IsDBNull(int i)
        {
            return i >= _originalOrdinalCount ? ExecuteAdditionalFieldFunc(i) == null : Reader.IsDBNull(i);
        }
    
        object IDataRecord.this[string name]
        {
            get
            {
                var ordinal = ((IDataRecord) this).GetOrdinal(name);
                return ((IDataRecord) this).GetValue(ordinal);
            }
        }
    
    
        object IDataRecord.this[int i] => ((IDataRecord) this).GetValue(i);
    
        private int GetAppendColumnOrdinal(int index)
        {
            return _originalOrdinalCount + index;
        }
    
        private int GetAppendColumnIndex(int oridinal)
        {
            return oridinal - _originalOrdinalCount;
        }
    
        private object ExecuteAdditionalFieldFunc(int oridinal)
        {
            return _additionalFields[GetAppendColumnIndex(oridinal)].Func(_currentRowIndex, Reader);
        }
    
        public struct AdditionalField
        {
            public AdditionalField(string columnName, Type dataType, Func<int, IDataReader, object> func = null)
            {
                ColumnName = columnName;
                DataType = dataType;
                Func = func;
            }
    
            public string ColumnName;
            public Type DataType;
            public Func<int, IDataReader, object> Func;
        }
    }
    

    Here is a quick NUnit test I wrote to show case its usage

    [Test]
    public void ReaderTest()
    {
        using (var conn = new SqlConnection(ConnectionSettingsCollection.Default))
        {
            conn.Open();
            const string sql = @"
                SELECT 1 as OriginalField
                UNION
                SELECT -500 as OriginalField
                UNION
                SELECT 100 as OriginalField
            ";
    
            var additionalFields = new[]
            {
                new WrappedDataReader.AdditionalField("StaticField", typeof(int), delegate { return "X"; }),
                new WrappedDataReader.AdditionalField("CounterField", typeof(int), (i, reader) => i),
                new WrappedDataReader.AdditionalField("ComputedField", typeof(int), (i, reader) => (int) reader["OriginalField"] + 1000)
            };
    
            const string expectedJson = @"
                [
                    {""OriginalField"":-500,""StaticField"":""X"",""CounterField"":0,""ComputedField"":500},
                    {""OriginalField"":1,   ""StaticField"":""X"",""CounterField"":1,""ComputedField"":1001},
                    {""OriginalField"":100, ""StaticField"":""X"",""CounterField"":2,""ComputedField"":1100}
                ]
            ";
    
            var actualJson = ToJson(new WrappedDataReader(new SqlCommand(sql, conn).ExecuteReader(), additionalFields));
    
            Assert.Zero(CultureInfo.InvariantCulture.CompareInfo.Compare(expectedJson, actualJson, CompareOptions.IgnoreSymbols));
        }
    }
    
    private static string ToJson(IDataReader reader)
    {
        using (var strWriter = new StringWriter(new StringBuilder()))
        using (var jsonWriter = new JsonTextWriter(strWriter))
        {
            jsonWriter.WriteStartArray();
    
            while (reader.Read())
            {
                jsonWriter.WriteStartObject();
    
                for (var i = 0; i < reader.FieldCount; i++)
                {
                    jsonWriter.WritePropertyName(reader.GetName(i));
                    jsonWriter.WriteValue(reader[i]);
                }
    
                jsonWriter.WriteEndObject();
            }
    
            jsonWriter.WriteEndArray();
    
            return strWriter.ToString();
        }
    }