Search code examples
python-3.xmultithreadingpermutation

Memory Safe Permutations Generator with Increasing Length Value in ThreadPoolExecutor


Thanks to @rici in the comments for steering me in the right direction on this. I have discovered that concurrent.futures.map() and concurrent.futures.execut() utilize immediate processing of iterables, whereas Python's default map() function can lazily go through iterables, which is much more desirable when dealing with large product and permutation spaces. The concurrent.futures route uses up all RAM when it gets to combo's of 2 or more in the example code below.

What I'm looking to do now is implement what I have in the updated code below, with multithreading. What I'm looking to do is multithread Python's default map() function, pulling iterables from one common product generator. I've commented out the "working" multithreaded example for reference and to show what I was trying to accomplish.

I stumbled upon a potential fix in the main_lazy function from this post, however I'm confused on how to implement that with my code's function that returns 2 values? The maps and zips and lambdas confuse me here, and I'm not sure how the chunk thing would work with the space I'm working with, but maybe it'll make sense to someone else.

For now, here is the single threaded version of the memory safe code that I'm trying to multithread now.

Note that I don't care about the math behind how many combinations this generates as it's irrelevant to my use case, so long as it keeps memory usage down. Here's the updated code.

To reproduce:

  1. Download VAmPI and start the server
  2. Update the BASE_URL in the code below to match your server
  3. Run this code
import concurrent.futures
from itertools import product, chain, islice
import requests, urllib


# ---------------------------------------------------------------------------- #
#                                   Variables                                  #
# ---------------------------------------------------------------------------- #
MAX_ENDPOINT_PERMUTATION_LENGTH = 3
MAX_WORKERS = 6
# BASE_URL = 'http://localhost:5000/'
BASE_URL = 'http://172.16.1.82:5000//' # This should be the Vampi url of the 
                                       # server on your machine
if BASE_URL[-1] != "/":
    BASE_URL = BASE_URL + "/"


# ---------------------------------------------------------------------------- #
#                   Retrieve list of endpoints to product'ize                  #
# ---------------------------------------------------------------------------- #
list_of_individual_api_endpoints = []
url = r"https://gist.githubusercontent.com/yassineaboukir/8e12adefbd505ef704674ad6ad48743d/raw/3ea2b7175f2fcf8e6de835c72cb2b2048f73f847/List%2520of%2520API%2520endpoints%2520&%2520objects"
file = urllib.request.urlopen(url)
for line in file:
    decoded_line = line.decode("utf-8").replace("\n","")
    list_of_individual_api_endpoints.append(decoded_line)


# ---------------------------------------------------------------------------- #
#                 The multithreaded function we're going to use                #
# ---------------------------------------------------------------------------- #
def ping_current_api_endpoint(endpoint):
    # Deconstruct a proper endpoint from the passed in tuple
    new_endpoint = ""
    for x in endpoint:
        new_endpoint += str(x) + "/"
    new_endpoint = new_endpoint[:-1]
    # Ping the endpoint to get a response code
    response = requests.get(BASE_URL + str(new_endpoint))
    status_code = response.status_code
    return status_code, new_endpoint


# # ---------------------------------------------------------------------------- #
# #                                 Main Function                                #
# # ---------------------------------------------------------------------------- #
# # MULTITHREADED ATTEMPT. EATS UP RAM WHEN GETTING TO DEPTH OF 2
# def main():
#     results_dict = {'endpoint':[], 'status_code': []}
#     # Start the threadpool
#     with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
#         # Try/Except for a keyboard interrupt. If this is not the correct implementation
#         # to stop a multithreaded pool, please demonstrate the correct way
#         try:
#             # Iterate from 1 to 3
#             for i in range(1, MAX_ENDPOINT_PERMUTATION_LENGTH+1):
#                 print("Checking endpoints with depth of", i)
#                 # Can change this from .map to .execute, tried them both
#                 future = executor.submit(ping_current_api_endpoint, product(list_of_individual_api_endpoints, repeat=i))
#                 status_code = future.result()[0]
#                 endpoint = future.result()[1]
#                 if str(status_code) != "404":
#                     results_dict['endpoint'].append(endpoint)
#                     results_dict['status_code'].append(status_code)
#                     print("Endpoint:", endpoint, ", Status Code:", status_code)
#         except KeyboardInterrupt:
#             print("Early stopping engaged...")
#             pass
#     # Print the results dict
#     print(results_dict)


# LAZY MAP FUNCTION, SINGLE THREADED, THAT I'D LIKE TO TURN INTO MULTI
def main_lazy():
    results_dict = {'endpoint':[], 'status_code': []}
    for i in range(1, MAX_ENDPOINT_PERMUTATION_LENGTH+1):
        print("Checking endpoints with depth of", i)
        results = map(ping_current_api_endpoint, (product(list_of_individual_api_endpoints, repeat=i)))
        for status_code, endpoint in results:
            # print(status_code, endpoint)
            if str(status_code) != "404":
                results_dict['endpoint'].append(endpoint)
                results_dict['status_code'].append(status_code)
                print("Endpoint:", endpoint, ", Status Code:", status_code)

# ---------------------------------------------------------------------------- #
#                                 Start Program                                #
# ---------------------------------------------------------------------------- #
if __name__ == "__main__":
    # main()
    main_lazy()

Solution

  • I figured out a solution. After the section of code that gets the endpoints list from github, I use the following:

    # ---------------------------------------------------------------------------- #
    #                        Function to Ping API Endpoints                        #
    # ---------------------------------------------------------------------------- #
    # Create Thread Safe Class for Generator and Worker Function
    results_dict = {"endpoint": [], "status_code": []}
    class LockedIterator(object):
        def __init__(self, iterator):
            self.lock = threading.Lock()
            self.iterator = iter(iterator)
        def __iter__(self): return self
        def __next__(self):
            with self.lock:
                return self.iterator.__next__()
                
    def generator_function(repeat):
        for x in product(list_endpoint_words, repeat=repeat):
            yield x
    
    def worker_function(current_gen_value):
        for endpoint in current_gen_value:
            # time.sleep(randint(0,2))
            if len(endpoint) > 1:
                for x in endpoint:
                    new_endpoint = x + "/"
                new_endpoint = new_endpoint[:-1]
            else:
                new_endpoint = endpoint[0]
            response = requests.get(BASE_URL + str(new_endpoint))
            status_code = response.status_code
            if str(status_code) != "404":
                results_dict['endpoint'].append(endpoint)
                results_dict['status_code'].append(status_code)
                print("Endpoint:", endpoint, ", Status Code:", status_code)
    
    
    # ---------------------------------------------------------------------------- #
    #                              Main Program Start                              #
    # ---------------------------------------------------------------------------- #
    start_time = time.time()
    
    for repeat in range(1, MAX_ENDPOINT_PERMUTATION_LENGTH+1):
    
        thread_safe_generator = LockedIterator(generator_function(repeat))
    
        threads_list = []
        for _ in range(MAX_WORKERS):
            thread = threading.Thread(target=worker_function, args=(thread_safe_generator,))
            # thread.daemon = True
            threads_list.append(thread)
        for thread in threads_list:
            thread.start()
        for thread in threads_list:
            thread.join()
        
    results_df = DataFrame.from_dict(results_dict)
    results_df = results_df.sort_values(by='status_code', ascending=True).reset_index(drop=True)
    results_df.to_csv("endpoint_results.csv", index=False)
    print(results_df)
    print("Elapsed time:", int((time.time() - start_time) / 60), "minutes." )
    

    This creates a thread and memory safe generator and multiple threads for the workers. Now only thing missing is how to make CTRL + C work with this, but whatever.