Here is the code that I am working with. Currently my goal was for it to divide each chunk into smaller parts based on the number of threads, which should provide better load balancing across the threads. I am new to using threading in python, so I am not sure if what I am doing is optimal or not.
Here is the code I have created:
#so far this is the most optimal
import heapq
import re
import time
import psutil
from collections import defaultdict
import requests
import tracemalloc
from concurrent.futures import ThreadPoolExecutor
import os
# Load stop words
stop_words_url = "https://gist.githubusercontent.com/sebleier/554280/raw/7e0e4a1ce04c2bb7bd41089c9821dbcf6d0c786c/NLTK's%2520list%2520of%2520english%2520stopwords"
stop_words = set(requests.get(stop_words_url).text.split())
def tokenize(line):
return re.findall(r'\b\w+\b', line)
def count_words(chunk):
word_count = defaultdict(int)
for line in chunk:
words = tokenize(line)
for word in words:
if word.lower() not in stop_words:
word_count[word] += 1
return word_count
def top_k_words(word_count, k):
return heapq.nlargest(k, word_count.items(), key=lambda x: x[1])
def analyze_performance(file_path, k=10, chunk_size=10000, num_threads=os.cpu_count()):
start_time = time.time()
tracemalloc.start()
word_count = defaultdict(int)
with open(file_path, 'r', encoding='utf-8') as file:
chunk = []
for line in file:
chunk.append(line)
if len(chunk) == chunk_size:
with ThreadPoolExecutor(max_workers=num_threads) as executor:
futures = {executor.submit(count_words, chunk[i::num_threads]) for i in range(num_threads)}
for future in futures:
chunk_word_count = future.result()
for word, count in chunk_word_count.items():
word_count[word] += count
chunk = []
if chunk:
with ThreadPoolExecutor(max_workers=num_threads) as executor:
futures = {executor.submit(count_words, chunk[i::num_threads]) for i in range(num_threads)}
for future in futures:
chunk_word_count = future.result()
for word, count in chunk_word_count.items():
word_count[word] += count
top_k = top_k_words(word_count, k)
current, peak = tracemalloc.get_traced_memory()
tracemalloc.stop()
end_time = time.time()
elapsed_time = end_time - start_time
cpu_percent = psutil.cpu_percent()
print(f"Top {k} words: {top_k}")
print(f"Elapsed time: {elapsed_time:.2f} seconds")
print(f"CPU usage: {cpu_percent}%")
#print(f"Memory usage: {memory_usage / (1024 * 1024):.2f} MB")
return elapsed_time, cpu_percent, peak
if __name__ == "__main__":
file_paths = [
"small_50MB_dataset.txt",
]
chunk_sizes = [10, 100, 1000, 10000, 100000, 1000000, 10000000, 100000000, 1000000000, 10000000000]
for file_path in file_paths:
print(f"Processing {file_path}")
for chunk_size in chunk_sizes:
for numThreads in range(1, os.cpu_count() + 1):
print("Partition Size:", chunk_size / (1024 * 1024), "MB", "chunk size of:", chunk_size)
elapsed_time, cpu_usage, memory_usage = analyze_performance(file_path, chunk_size=chunk_size, num_threads=numThreads)
print("\n")
result = {
"chunk_size": chunk_size,
"num_threads": numThreads,
"elapsed_time": elapsed_time,
"cpu_usage": cpu_usage,
"memory_usage": float(memory_usage / 10**6)
}
results.append(result)
# Create pandas DataFrame from the results
df2 = pd.DataFrame(results)
and here was the output:
+----+--------------+---------------+----------------+-------------+----------------+
| | chunk_size | num_threads | elapsed_time | cpu_usage | memory_usage |
|----+--------------+---------------+----------------+-------------+----------------|
| 40 | 10 | 1 | 51.2827 | 24.8 | 8.01863 |
| 41 | 10 | 2 | 60.1906 | 65.5 | 8.3454 |
| 42 | 100 | 1 | 32.4096 | 64.4 | 8.11009 |
| 43 | 100 | 2 | 33.402 | 60 | 8.16907 |
| 44 | 1000 | 1 | 25.7621 | 62.5 | 8.48084 |
| 45 | 1000 | 2 | 31.2087 | 65 | 9.02304 |
| 46 | 10000 | 1 | 24.5674 | 70.6 | 12.702 |
| 47 | 10000 | 2 | 23.1408 | 63.7 | 13.9474 |
| 48 | 100000 | 1 | 19.4707 | 58.7 | 43.1203 |
| 49 | 100000 | 2 | 21.5641 | 64.6 | 42.0958 |
| 50 | 1e+06 | 1 | 21.23 | 61.9 | 99.1393 |
| 51 | 1e+06 | 2 | 21.2195 | 60.7 | 104.215 |
| 52 | 1e+07 | 1 | 21.5565 | 64.3 | 99.153 |
| 53 | 1e+07 | 2 | 22.712 | 66.1 | 104.216 |
| 54 | 1e+08 | 1 | 20.8239 | 61.9 | 99.1389 |
| 55 | 1e+08 | 2 | 22.5298 | 63.9 | 104.217 |
| 56 | 1e+09 | 1 | 21.4913 | 64.3 | 99.1535 |
| 57 | 1e+09 | 2 | 20.9633 | 58.6 | 104.232 |
| 58 | 1e+10 | 1 | 21.4864 | 64.6 | 99.1389 |
| 59 | 1e+10 | 2 | 22.0327 | 63.9 | 104.216 |
+----+--------------+---------------+----------------+-------------+----------------+
Any advice on where to improve or what to change to get a more optimal solution would be greatly appritiated.
You cannot use threads in a Python program for parallel processing.
Python was invented way back when computers only had one CPU core each. The architect of the language decided to use a global mutex, known as the GIL (Global Interpreter Lock) to protect all of the interpreter's data structures. There was no down-side to the GIL back in the day, but now, in the age of multi-processor CPUs, it means that no two threads can ever execute Python statements in parallel.
Unfortunately, the existence of the GIL shapes Python's un-specified memory model. There's no way to remove the GIL from Python without potentially breaking existing Python programs.
If you want your program to execute statements in parallel, you'll have to do it via the multiprocessing
module, not the threading
module.