Search code examples
arrayspython-3.xconcurrent.futures

How to share counter variable among threads using threadpool.executor and increment it?


Following is a thread pool executor that I have implemented in python 3.x

  with ThreadPoolExecutor(max_workers=15) as ex:
        f = open(filename, 'r', encoding='UTF-8')
        results = {ex.submit(callreadline, files ): files for files in f.readlines() }

The results variable contains values in the following format:

words and their corresponding 200 dimensional embedding

You can see that the values are tuples. The first value is a word and the 2nd value is 200 dimensional array. The number of values are 400000 in total. So there are 400000 tuples.

Now what I want to do is create another thread pool executor that does the following task

  1. Create a ordered dictionary of the first values in the tuple list. This means that say words of the first 4 tuple values are the, is ,are, said. Then the ordered dictionary will contain:

{the:0,is:1,are:2,said:3,...…………….hello:399999}

  1. Create a numpy nd array which contains the 200 dimensional array of the corresponding words in ordered dictionary(By corresponding word I mean the first entry will be of 200 dimensional array of word the, then 200 dimensional array of is...and the list goes on). So the numpy nd array will be of dimension 400000 * 200.

I was using for loop with the following code

    count = 0
    word_to_idx = OrderedDict()
    vectors = []
    for future in results.result:
            b = future.result()
            word_to_idx[count] = b[0]
            if(count == 0):
                vectors =  np.array([b[1]])
            else:    
                vectors = np.append(vectors,np.array([b[1]]),axis=0)
            count = count +1

At the end of the above function I returned word_to_idx and vectors which did the job. However, the looping of 400000 tuples and assigning one by one to variable took extremely long time(about 10 hours).

So I was thinking if there is a way of parallelizing this functionality as well using thread pool executor.

I was thinking of creating threads and then share a counter variable with each thread getting access to the shared variable one at a time. The thread would then increment that variable and then another thread will access the incremented counter. Could someone point me to the right direction?

Edit:

Here is the call readline function which works really fast as it is called with 15 workers:

def callreadline(line):
        # word_to_idx = OrderedDict() 
        word_to_idx = OrderedDict()
        vectors = []
        vocabulary = None
        word_to_idx = read_w2v_word(line.split(' ')[0])
        try:
            vectors = np.append(vectors, [np.array(line.split(' ')[1:])], axis=0)
        except:
            vectors = np.array(line.split(' ')[1:],dtype=float)
        if vocabulary is not None:
            word_to_idx, vectors = filter_words(word_to_idx, vectors, vocabulary)
        return word_to_idx,vectors

Solution

  • I have a feeling the callreadline function also isn't even close to being as fast as it could be, but it wasn't part of the question, so let me try to fix the rest for you:

    with ThreadPoolExecutor(max_workers=15) as ex:
            f = open(filename, 'r', encoding='UTF-8')
            results = [ex.submit(callreadline, files) for files in f.readlines()]
    
    word_to_idx = dict()
    vectors = []
    for count, future in enumerate(results):
        b = future.result()
        word_to_idx[b[0]] = count
        vectors.append(b[1])
    
    vectors = np.array(vectors)