Search code examples
pythonmultithreadingpython-multithreadingpartitioning

Trying to add threading to python function but it isn't decreasing run time


Here is the code that I am working with. Currently my goal was for it to divide each chunk into smaller parts based on the number of threads, which should provide better load balancing across the threads. I am new to using threading in python, so I am not sure if what I am doing is optimal or not.

Here is the code I have created:

#so far this is the most optimal
import heapq
import re
import time
import psutil
from collections import defaultdict
import requests
import tracemalloc
from concurrent.futures import ThreadPoolExecutor
import os
# Load stop words
stop_words_url = "https://gist.githubusercontent.com/sebleier/554280/raw/7e0e4a1ce04c2bb7bd41089c9821dbcf6d0c786c/NLTK's%2520list%2520of%2520english%2520stopwords"
stop_words = set(requests.get(stop_words_url).text.split())

def tokenize(line):
    return re.findall(r'\b\w+\b', line)

def count_words(chunk):
    word_count = defaultdict(int)
    for line in chunk:
        words = tokenize(line)
        for word in words:
            if word.lower() not in stop_words:
                word_count[word] += 1
    return word_count


def top_k_words(word_count, k):
    return heapq.nlargest(k, word_count.items(), key=lambda x: x[1])

def analyze_performance(file_path, k=10, chunk_size=10000, num_threads=os.cpu_count()):
    start_time = time.time()
    tracemalloc.start()
    word_count = defaultdict(int)
    with open(file_path, 'r', encoding='utf-8') as file:
        chunk = []
        for line in file:
            chunk.append(line)
            if len(chunk) == chunk_size:
                with ThreadPoolExecutor(max_workers=num_threads) as executor:
                    futures = {executor.submit(count_words, chunk[i::num_threads]) for i in range(num_threads)}
                    for future in futures:
                        chunk_word_count = future.result()
                        for word, count in chunk_word_count.items():
                            word_count[word] += count
                chunk = []
        if chunk:
            with ThreadPoolExecutor(max_workers=num_threads) as executor:
                futures = {executor.submit(count_words, chunk[i::num_threads]) for i in range(num_threads)}
                for future in futures:
                    chunk_word_count = future.result()
                    for word, count in chunk_word_count.items():
                        word_count[word] += count
    
    top_k = top_k_words(word_count, k)
    current, peak = tracemalloc.get_traced_memory()
    tracemalloc.stop()
    end_time = time.time()
    
    elapsed_time = end_time - start_time
    
    cpu_percent = psutil.cpu_percent()

    print(f"Top {k} words: {top_k}")
    print(f"Elapsed time: {elapsed_time:.2f} seconds")
    print(f"CPU usage: {cpu_percent}%")
    #print(f"Memory usage: {memory_usage / (1024 * 1024):.2f} MB")
    return elapsed_time, cpu_percent, peak

if __name__ == "__main__":
    file_paths = [
        "small_50MB_dataset.txt",
    ]
    chunk_sizes = [10, 100, 1000, 10000, 100000, 1000000, 10000000, 100000000, 1000000000, 10000000000]

    for file_path in file_paths:
        print(f"Processing {file_path}")
        for chunk_size in chunk_sizes:
            for numThreads in range(1, os.cpu_count() + 1):
                print("Partition Size:", chunk_size / (1024 * 1024), "MB", "chunk size of:", chunk_size)
                elapsed_time, cpu_usage, memory_usage = analyze_performance(file_path, chunk_size=chunk_size, num_threads=numThreads)
                print("\n")

                result = {
                    "chunk_size": chunk_size,
                    "num_threads": numThreads,
                    "elapsed_time": elapsed_time,
                    "cpu_usage": cpu_usage,
                    "memory_usage": float(memory_usage / 10**6)
                }
                results.append(result)

    # Create pandas DataFrame from the results
    df2 = pd.DataFrame(results)

and here was the output:

+----+--------------+---------------+----------------+-------------+----------------+
|    |   chunk_size |   num_threads |   elapsed_time |   cpu_usage |   memory_usage |
|----+--------------+---------------+----------------+-------------+----------------|
| 40 |       10     |             1 |        51.2827 |        24.8 |        8.01863 |
| 41 |       10     |             2 |        60.1906 |        65.5 |        8.3454  |
| 42 |      100     |             1 |        32.4096 |        64.4 |        8.11009 |
| 43 |      100     |             2 |        33.402  |        60   |        8.16907 |
| 44 |     1000     |             1 |        25.7621 |        62.5 |        8.48084 |
| 45 |     1000     |             2 |        31.2087 |        65   |        9.02304 |
| 46 |    10000     |             1 |        24.5674 |        70.6 |       12.702   |
| 47 |    10000     |             2 |        23.1408 |        63.7 |       13.9474  |
| 48 |   100000     |             1 |        19.4707 |        58.7 |       43.1203  |
| 49 |   100000     |             2 |        21.5641 |        64.6 |       42.0958  |
| 50 |        1e+06 |             1 |        21.23   |        61.9 |       99.1393  |
| 51 |        1e+06 |             2 |        21.2195 |        60.7 |      104.215   |
| 52 |        1e+07 |             1 |        21.5565 |        64.3 |       99.153   |
| 53 |        1e+07 |             2 |        22.712  |        66.1 |      104.216   |
| 54 |        1e+08 |             1 |        20.8239 |        61.9 |       99.1389  |
| 55 |        1e+08 |             2 |        22.5298 |        63.9 |      104.217   |
| 56 |        1e+09 |             1 |        21.4913 |        64.3 |       99.1535  |
| 57 |        1e+09 |             2 |        20.9633 |        58.6 |      104.232   |
| 58 |        1e+10 |             1 |        21.4864 |        64.6 |       99.1389  |
| 59 |        1e+10 |             2 |        22.0327 |        63.9 |      104.216   |
+----+--------------+---------------+----------------+-------------+----------------+

Any advice on where to improve or what to change to get a more optimal solution would be greatly appritiated.


Solution

  • You cannot use threads in a Python program for parallel processing.

    Python was invented way back when computers only had one CPU core each. The architect of the language decided to use a global mutex, known as the GIL (Global Interpreter Lock) to protect all of the interpreter's data structures. There was no down-side to the GIL back in the day, but now, in the age of multi-processor CPUs, it means that no two threads can ever execute Python statements in parallel.

    Unfortunately, the existence of the GIL shapes Python's un-specified memory model. There's no way to remove the GIL from Python without potentially breaking existing Python programs.

    If you want your program to execute statements in parallel, you'll have to do it via the multiprocessing module, not the threading module.