Search code examples
c++multithreadingcassandradatastax

Cassandra session data corruption from multithreaded read


I am having an issue with the cassandra api and select queries. In a single threaded context it works just fine. However, when multiple threads use the same object and call the function, even with just two threads, cassandra is returning it's futures with bad data(sometimes appears to be overwritten with data from other select queries, other times it is just garbage). I have a singleton object handling my calls to cassandra, and a byte buffer struct to hold the data returned. I have determined that, on a single thread, it all works as intended, but when I have added more threads using the same object and calling the same function, erroneous data is introduced into the data returned from cassandra.

The values sizes are ~1kb in size, and the keys are 32 bytes.

You can see the 2 lines surrounded by commented out mutex locks. If uncommented, this prevents the incorrect data issue, but also negates any performance gains from increasing the number of threads.

Additionally, the percentage of corrupted queries is about ~33%.

The cassandra api is supposed to be able to handle multiple threads and connections to the session without any issues(as per http://datastax.github.io/cpp-driver/topics/), so why am I getting back bad data?

I am using Centos7, c++14 and cassandra api 2.15 with cassandra 3.11.4

#include <cassert> 
#include "DataObj.h"//contains the rest of my includes

//byteBuf has two members: a const uint8_t pointer, and a uint32_t variable for declaring the size
DataObj::byteBuf  DataObj::threadGet(DataObj::byteBuf key)
{
  CassError rc = CASS_OK;
  CassStatement* statement = NULL;
  CassFuture* future = NULL;                        
  string temp = "SELECT value FROM "+ keyspace+"."+tablename+" WHERE id = ?";// variables defined elsewhere
  const char* query = (const char*)temp.c_str();
  statement = cass_statement_new(query, 1);
 //I am also having issues with prepared statements not working properly, 
//but I believe it is not directly related to this question. This setup was working fine on 1 thread    
  cass_statement_bind_bytes(statement, 0, key.data, key.size);
  //rw_mut.lock();   
  future = cass_session_execute(m_session, statement);// m_session is the cassandra session of the object
  cass_future_wait(future);
  //rw_mut.unlock();
//The two statements above, when gated by the mutex, do not produce errors when multithreading,
// but also do not gain any performance.
// When not gated, works fine on a single thread, but corrupts the return data on 2 or more threads

  const  uint8_t* st=nullptr;
  size_t len=0;                 
  rc = cass_future_error_code(future);
  if (rc != CASS_OK)
  {
    //error handling...
  }
  else
  {
    const CassResult* result = cass_future_get_result(future);
    CassIterator* iterator = cass_iterator_from_result(result);                     
         if (cass_iterator_next(iterator))
          {                       
             const CassRow* row = cass_iterator_get_row(iterator);
             cass_value_get_bytes(cass_row_get_column_by_name(row, "value"), &st,&len);
          }                         
    cass_result_free(result);
    cass_iterator_free(iterator);
  }               
  DataObj::byteBuf  res((uint8_t *) st, len);
  //was able to use gdb here and confirm that the data is corrupt

  cass_future_free(future);      

  return res;
}  

Solution

  • It looks like you're freeing the results before a copy is made from the value pointers into the byteBuf. In single-threaded version, you may be getting lucky that the dereferenced memory is still intact. Multithreaded, you're likely to overwrite.