Search code examples
c#wcfstreamstreamingchunking

WCF streaming large number objects


I have a WCF service that query a database and returns a large number of records. There is so many records, that the server runs out of memory and fails before it can return.

So I want to send the records back as I fetch them from the database, or a set number back at a time.

For additional clarity, I cannot collect call records fetched into a collection on the server, as the server runs out of memory before I have collected all the records. I want to try and find away to send them back one by one or in chunks, in one call.

For example, in chunks:

  1. Fetch first 1000 records
  2. Add to collection
  3. Send collection to client
  4. Clear collection
  5. Fetch next 1000 records, and repeat from step 2

So the idea I have how the web service code will look something like this:

Public IEnumerable<Customer> GetAllCustomers()
{
     // Setup Query
     string query = PrepareQuery();

     // Create Connection
     connection = new SqlConnection(ConnectionString);
     connection.Open();

     var sqlcommand = connection.CreateCommand();
     sqlcommand.CommandText = query.ToString();

     // Read Results
     var reader = sqlcommand.ExecuteReader();
     while (reader.Read())
     {
         Customer customer = new Customer();
         foreach (var column in Columns)
         {
             int fieldIndex = reader.GetOrdinal(column);
             object value = reader.GetValue(fieldIndex);
             customer[column.Name] = value;
         }

         yield return customer;
     }
}

I don't want to consider paging as the Order By on the SQL server is slow.

Looking for way to do this in WCF


Solution

  • Thank to mikelegg & Reniuz for helping come to a solution. I wish I could give them the tick for the right answer, but I am a afraid the next developer to read this question would not fully benefit. So where is what I ended up with.

    1. Setup the config files for the Server and Client (Follow link: Large Data and Streaming)
    2. Followed this solution, can download source code from here

    I had to change the DBRowStream.DBThreadProc method a bit to work so I post the source code:

    DBRowStream Class:

        void DBThreadProc(object o)
        {
            SqlConnection con = null;
            SqlCommand com = null;
    
            try
            {
                con = new System.Data.SqlClient.SqlConnection(/*ConnectionString*/);
                com = new SqlCommand();
                com.Connection = con;
                com.CommandText = PrepareQuery();
                con.Open();
                SqlDataReader reader = com.ExecuteReader();
    
                int count = 0;
    
                MemoryStream memStream = memStream1;
                memStreamWriteStatus = 1;
                readyToWriteToMemStream1.WaitOne();
    
                while (reader.Read())
                {
                    // Populate
                    Customer customer = new Customer();
                    foreach (var column in Columns)
                    {
                        int fieldIndex = reader.GetOrdinal(column);
                        object value = reader.GetValue(fieldIndex);
                        customer[column.Name] = value;
                    }                   
    
                    // Serialize: I used a custom Serializer 
                    // but BinaryFormatter should be fine
                    DBDataFormatter.Serialize(memStream, customer);
    
                    count++;
    
                    if (count == PAGESIZE) // const int PAGESIZE = 10000
                    {
                        switch (memStreamWriteStatus)
                        {
                            case 1: // done writing to stream 1
                                {
                                    memStream1.Position = 0;
                                    readyToSendFromMemStream1.Set();
                                    // write stream 1 is done...waiting for stream 2 
                                    readyToWriteToMemStream2.WaitOne();
                                    memStream = memStream2;
                                    memStream.Position = 0;
                                    memStream.SetLength(0); // Added:To Reset the stream. Else was getting garbage data back
                                    memStreamWriteStatus = 2;
    
                                    break;
                                }
                            case 2: // done writing to stream 2
                                {
                                    memStream2.Position = 0;
                                    readyToSendFromMemStream2.Set();
                                    // Write on stream 2 is done...waiting for stream 1
                                    readyToWriteToMemStream1.WaitOne();
                                    // done waiting for stream 1 
                                    memStream = memStream1;
                                    memStreamWriteStatus = 1;
                                    memStream.Position = 0;
                                    memStream.SetLength(0); // Added: Reset the stream. Else was getting garbage data back
    
                                    break;
                                }
                        }
                        count = 0;
                    }
                }
    
                if (count > 0)
                {
                    switch (memStreamWriteStatus)
                    {
                        case 1: // done writing to stream 1
                            {
                                memStream1.Position = 0;
                                readyToSendFromMemStream1.Set();
                                // END write stream 1 is done...waiting for stream 2 
                                break;
                            }
                        case 2: // done writing to stream 2
                            {
                                memStream2.Position = 0;
                                readyToSendFromMemStream2.Set();
                                // END write stream 2 is done...waiting for stream 1 
                                break;
                            }
                    }
                }
                bDoneWriting = true;
                bCanRead = false;
            }
            catch
            {
                throw;
            }
            finally
            {
                if (com != null)
                {
                    com.Dispose();
                    com = null;
                }
                if (con != null)
                {
                    con.Close();
                    con.Dispose();
                    con = null;
                }
            }
        }
    

    And then the Client side:

    private static void TestGetRecordsAndDump()
    {
        const string FILE_NAME = "Records.CSV";
        File.Delete(FILE_NAME);
        var file = File.AppendText(FILE_NAME);
        long count = 0;
        try
        {
            ServiceReference1.ServiceClient service = new ServiceReference1.DataServiceClient();
            var stream = service.GetDBRowStream();
    
            Console.WriteLine("Records Retrieved : ");
            Console.WriteLine("File Size (MB)    : ");
    
            var canDoLastRead = true;
            while (stream.CanRead && canDoLastRead)
            {
               try
               {
                   Customer customer = DBDataFormatter.Deserialize(stream); // Used custom Deserializer, but BinaryFormatter should be fine
    
                   file.Write(customer.ToString());
    
                   count++;
               }
               catch
               {
                    canDoLastRead = false; // Bug: stream.CanRead is not set to false at the end of stream, so I do this trick to know if I finished retruning all records.
               }
               finally
               {
                   Console.SetCursorPosition("Records Retrieved : ".Length, 0);
                   Console.Write(string.Format("{0}               ", count));
                   Console.SetCursorPosition("File Size (MB)    : ".Length, 1);
                   Console.Write(string.Format("{0:G}             ", file.BaseStream.Length / 1024f / 1024f));     
               }
            }
            finally
            {
                file.Close();
            }
        }
    }
    

    There is a bug I cannot seem to solve, that stream.CanRead is not set to false, then all the records have been returned, have not been able to work out why, but at least now, I can query large data sets, and return all records, with out the server or client running out of memory.