Search code examples
c++threadpool

Using a thread pool to parallelize a function makes it slower: why?


I am working a on database than runs on top on RocksDB. I have a find function that takes a query in parameter, iterates over all documents in the database, and returns the documents that match the query. I want to parallelize this function so the work is spread on multiple threads.

To achieve that, I tried to use ThreadPool: I moved the code of the loop in a lambda, and added a task to the thread pool for each document. After the loop, each result is processed by the main thread.

Current version (single thread):

void
EmbeDB::find(const bson_t& query,
             DocumentPtrCallback callback,
             int32_t limit,
             const bson_t* projection)
{
    int32_t count = 0;
    bson_error_t error;
    uint32_t num_query_keys = bson_count_keys(&query);
    mongoc_matcher_t* matcher = num_query_keys != 0
        ? mongoc_matcher_new(&query, &error)
        : nullptr;

    if (num_query_keys != 0 && matcher == nullptr)
    {
        callback(&error, nullptr);
        return;
    }

    bson_t document;
    rocksdb::Iterator* it = _db->NewIterator(rocksdb::ReadOptions());
    for (it->SeekToFirst(); it->Valid(); it->Next())
    {
        const char* bson_data = (const char*)it->value().data();
        int bson_length = it->value().size();
        std::vector<char> decrypted_data;
        if (encryptionEnabled())
        {
            decrypted_data.resize(bson_length);
            bson_length = decrypt_data(bson_data, bson_length, decrypted_data.data(), _encryption_method, _encryption_key, _encryption_iv);
            bson_data = decrypted_data.data();
        }
        bson_init_static(&document, (const uint8_t*)bson_data, bson_length);

        if (num_query_keys == 0 || mongoc_matcher_match(matcher, &document))
        {
            ++count;

            if (projection != nullptr)
            {
                bson_error_t error;
                bson_t projected;
                bson_init(&projected);

                mongoc_matcher_projection_execute_noop(
                    &document,
                    projection,
                    &projected,
                    &error,
                    NULL
                );

                callback(nullptr, &projected);
            }
            else
            {
                callback(nullptr, &document);
            }

            if (limit >= 0 && count >= limit)
            {
                break;
            }
        }
    }
    delete it;

    if (matcher)
    {
        mongoc_matcher_destroy(matcher);
    }
}

New version (multi-thread):

void
EmbeDB::find(const bson_t& query,
             DocumentPtrCallback callback,
             int32_t limit,
             const bson_t* projection)
{
    int32_t count = 0;
    bool limit_reached = limit == 0;
    bson_error_t error;
    uint32_t num_query_keys = bson_count_keys(&query);
    mongoc_matcher_t* matcher = num_query_keys != 0
        ? mongoc_matcher_new(&query, &error)
        : nullptr;

    if (num_query_keys != 0 && matcher == nullptr)
    {
        callback(&error, nullptr);
        return;
    }

    auto process_document = [this, projection, num_query_keys, matcher](const char* bson_data, int bson_length) -> bson_t*
    {
        std::vector<char> decrypted_data;
        if (encryptionEnabled())
        {
            decrypted_data.resize(bson_length);
            bson_length = decrypt_data(bson_data, bson_length, decrypted_data.data(), _encryption_method, _encryption_key, _encryption_iv);
            bson_data = decrypted_data.data();
        }

        bson_t* document = new bson_t();

        bson_init_static(document, (const uint8_t*)bson_data, bson_length);

        if (num_query_keys == 0 || mongoc_matcher_match(matcher, document))
        {
            if (projection != nullptr)
            {
                bson_error_t error;
                bson_t* projected = new bson_t();
                bson_init(projected);

                mongoc_matcher_projection_execute_noop(
                    document,
                    projection,
                    projected,
                    &error,
                    NULL
                );

                delete document;

                return projected;
            }
            else
            {
                return document;
            }
        }
        else
        {
            delete document;

            return nullptr;
        }

    };

    const int WORKER_COUNT = std::max(1u, std::thread::hardware_concurrency());

    ThreadPool pool(WORKER_COUNT);
    std::vector<std::future<bson_t*>> futures;

    bson_t document;
    rocksdb::Iterator* db_it = _db->NewIterator(rocksdb::ReadOptions());
    for (db_it->SeekToFirst(); db_it->Valid(); db_it->Next())
    {
        const char* bson_data = (const char*)db_it->value().data();
        int bson_length = db_it->value().size();

        futures.push_back(pool.enqueue(process_document, bson_data, bson_length));
    }
    delete db_it;

    for (auto it = futures.begin(); it != futures.end(); ++it)
    {
        bson_t* result = it->get();

        if (result)
        {
            count += 1;

            if (limit < 0 || count < limit)
            {
                callback(nullptr, result);
            }

            delete result;
        }
    }

    if (matcher)
    {
        mongoc_matcher_destroy(matcher);
    }
}

  • With simple documents and query, the single-thread version processes 1 million documents in 0.5 second on my machine.
  • With the same documents and query, the multi-thread version processes 1 million documents in 3.3 seconds.

Surprisingly, the multi-thread version is way slower. Moreover, I measured the execution time and 75% of the time is spent in the for loop. So basically the line futures.push_back(pool.enqueue(process_document, bson_data, bson_length)); takes 75% of the time.

I did the following:

  • I checked the value of WORKER_COUNT, it is 6 on my machine.
  • I tried to add futures.reserve(1000000), thinking that maybe the vector re-allocation was at fault, but it didn't change anything.
  • I tried to remove the dynamic memory allocations (bson_t* document = new bson_t();), it didn't change the result significantly.

So my question is: is there something that I did wrong for the multi-thread version to be that slower than the single-thread version?

My current understanding is that the synchronization operations of the thread pool (when tasks are enqueued and dequeued) are simply consuming the majority of the time, and the solution would be to change the data-structure. Thoughts?


Solution

  • Parallelization has overhead.

    It takes around 500 nanoseconds to process each document in the single-threaded version. There's a lot of bookkeeping that has to be done to delegate work to a thread-pool (both to delegate the work, and to synchronize it afterwards), and all that bookkeeping could very well require more than 500 nanoseconds per job.

    Assuming your code is correct, then the bookkeeping takes around 2800 nanoseconds per job. To get a significant speedup from parallelization, you're going to want to break the work into bigger chunks.

    I recommend trying to process documents in batches of 1000 at a time. Each future, instead of corresponding to just 1 document, will correspond to 1000 documents.

    Other optimizations

    If possible, avoid unnecessary copying. If something gets copied a bunch, see if you can capture it by reference instead of by value.