Search code examples
c++multithreadingboost-unordered

Threading to insert into a bimap


I would like to use multiple threads to insert into a bimap. I tried the following code:

parallel_index.cpp

#include <iostream>
#include <string>
#include <algorithm>
#include <thread>
#include <mutex>
#include <boost/bimap.hpp>
#include <boost/bimap/unordered_set_of.hpp>
#include <boost/bimap/unordered_multiset_of.hpp>
#include "parallel_index.h"


namespace bimaps = boost::bimaps;
typedef boost::bimap<bimaps::unordered_set_of<uint64_t>,
        bimaps::unordered_multiset_of<std::string> > bimap_reference;
typedef bimap_reference::value_type position;
bimap_reference reference_index_vector;


size_t total_threads = std::thread::hardware_concurrency();

std::string sequence_content = "ABCDDBACDDDCBBAAACBDAADCBDAAADCBDADADACBDDCBBBCDCBCDAADCBBCDAAADCBDA";
uint64_t sequence_length = sequence_content.length();
int split = 5;
uint64_t erase_length = 0;
unsigned int seq_itr = 0;

std::mutex mtx;   // to protect against concurent access
int main(){
    thread_test::create_index index;
    std::thread threads[total_threads-1];

    for(unsigned int i = 0; i < total_threads; i++){
        threads[i] = std::thread(&thread_test::create_index::reference_index_hash, std::ref(index), sequence_length, split, sequence_content, erase_length);
    }

    for(unsigned int i = 0; i < total_threads; i++){
        threads[i].join();
    }
}



/*
 * Creating index
 */
void thread_test::create_index::reference_index_hash(uint64_t &sequence_length, int &split,
        std::string &sequence_content, uint64_t &erase_length  ){

    for (; seq_itr < sequence_length; ++seq_itr ){
        std::string splitstr = sequence_content.substr(erase_length, split);

        std::lock_guard<std::mutex> lck(mtx);
        reference_index_vector.insert(position(seq_itr, splitstr));
        seq_itr += split-1;
        erase_length += split;


// the length of test is very large so erase some text
        if(erase_length > 10000){ 
            sequence_content.erase(0,erase_length);
            erase_length = 0;
        }
    }


//  for( bimap_reference::const_iterator iter = reference_index_vector.begin(), iend = reference_index_vector.end();
//          iter != iend; ++iter ) {
//      std::cout << iter->left << " <--> "<< iter->right <<std::endl;
//  }

}

parallel_index.h

#ifndef PARALLEL_INDEX_H_
#define PARALLEL_INDEX_H_


#include<iostream>
#include <algorithm>
#include <utility>
#include <boost/bimap.hpp>
#include <boost/bimap/unordered_set_of.hpp>
#include <boost/bimap/unordered_multiset_of.hpp>




//typedef boost::unordered_map<int, std::pair<int, unsigned long int>& > reference_map;

namespace bimaps = boost::bimaps;

typedef boost::bimap<bimaps::unordered_set_of<uint64_t>,
        bimaps::unordered_multiset_of<std::string > > bimap_reference;
typedef bimap_reference::value_type position;
extern bimap_reference reference_index_vector;


namespace thread_test{

class create_index{
public:
    void reference_index_hash(uint64_t &sequence_length, int &split,
            std::string &sequence_content, uint64_t &erase_length);
};
}


#endif /* PARALLEL_INDEX_H_ */

I use eclipse IDE to compile the code, but I get the errors

g++ -std=c++17 -I/home/ahussain/boost_1_65_1 -O3 -Wall -c -fmessage-length=0 -MMD -MP -MF"parallel_index.d" -MT"parallel_index.o" -o "parallel_index.o" "../parallel_index.cpp"
In file included from /usr/include/c++/5/thread:39:0,
                 from ../parallel_index.cpp:13:
/usr/include/c++/5/functional: In instantiation of ‘struct std::_Bind_simple<std::_Mem_fn<void (thread_test::create_index::*)(long unsigned int&, int&, std::__cxx11::basic_string<char>&, long unsigned int&)>(std::reference_wrapper<thread_test::create_index>, long unsigned int, int, std::__cxx11::basic_string<char>, long unsigned int)>’:
/usr/include/c++/5/thread:137:59:   required from ‘std::thread::thread(_Callable&&, _Args&& ...) [with _Callable = void (thread_test::create_index::*)(long unsigned int&, int&, std::__cxx11::basic_string<char>&, long unsigned int&); _Args = {std::reference_wrapper<thread_test::create_index>, long unsigned int&, int&, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >&, long unsigned int&}]’
../parallel_index.cpp:42:149:   required from here
/usr/include/c++/5/functional:1505:61: error: no type named ‘type’ in ‘class std::result_of<std::_Mem_fn<void (thread_test::create_index::*)(long unsigned int&, int&, std::__cxx11::basic_string<char>&, long unsigned int&)>(std::reference_wrapper<thread_test::create_index>, long unsigned int, int, std::__cxx11::basic_string<char>, long unsigned int)>’
       typedef typename result_of<_Callable(_Args...)>::type result_type;
                                                             ^
/usr/include/c++/5/functional:1526:9: error: no type named ‘type’ in ‘class std::result_of<std::_Mem_fn<void (thread_test::create_index::*)(long unsigned int&, int&, std::__cxx11::basic_string<char>&, long unsigned int&)>(std::reference_wrapper<thread_test::create_index>, long unsigned int, int, std::__cxx11::basic_string<char>, long unsigned int)>’
         _M_invoke(_Index_tuple<_Indices...>)
         ^
subdir.mk:18: recipe for target 'parallel_index.o' failed
make: *** [parallel_index.o] Error 1

I had a look here, but can't get away with the errors.


Solution

  • index can be passed as a simple pointer rather than a wrapped reference, but the remaining variables are expected to be references, so they need the std::ref treatment.

    In other words

    threads[i] = std::thread(&thread_test::create_index::reference_index_hash,
                             &index,
                             std::ref(sequence_length),
                             std::ref(split),
                             std::ref(sequence_content),
                             std::ref(erase_length));
    

    will compile.

    But...

    Unfortunately this means you'll have multiple threads all working off the same variables. This is usually a non-starter because the threads will be concurrently accessing and wreaking havoc on the values stored by the variables. You will have to rethink how you want to use these variables and either pass them by value or find a better way to isolate or protect them.

    And...

    std::thread threads[total_threads-1];
    

    combined with

    for(unsigned int i = 0; i < total_threads; i++)
    

    will result in one too many threads going into the array. 0 through total_threads is a count of total_threads threads, not total_threads-1.