Search code examples
pythonoptimizationnested-loopsrandom-walk

Is there a way to make the code exploring Random Walk in 1 D space more optimal? (Without nested for loops)


Trying to explore the Random Walk algorithm on Infinite Line and I'm looking for a way to make it as optimal as possible. Here is the code:

import random
from collections import Counter

def run(initial_pos,iterations,trials):
    final_pos = []
    for i in range (0,trials):
        pos = initial_pos
        for j in range (0,iterations):
            if random.choice(["left","right"]) == "left":
                pos -= 1
            else:
                pos += 1
        final_pos.append(pos)
    return Counter(final_pos)

Variable iterations indicates the number of repetitions in a single walk.
While trials indicates the number of walks.

The runtime is satisfactory for trials and iterations equals 10^4
Hwever, increasing to 10^5 requires long waiting time.


Solution

  • The Low-hanging Fruit

    First of all, let's get rid of those magic number and strings, and define some named constants:

    RIGHT_NAME, LEFT_NAME = "right", "left"
    RIGHT, LEFT = 1, -1
    

    Now, we can rewrite the inner loop body to use those constants:

    if random.choice([LEFT_NAME , RIGHT_NAME]) == LEFT_NAME:
        pos += LEFT
    else:
        pos += RIGHT
    

    Looking at that, two issues become apparent:

    1. We're comparing strings for equality. That is unlikely to perform better than comparing small integers.
    2. We have two pairs of constants representing the same concept (choice between left and right). We're repeating ourselves, and that's rarely a good thing.

    Let's eliminate both of those issues by just using the integer constants RIGHT and LEFT. Let's also store the result of random.choice into a temporary to better see the next issue:

    current_direction = random.choice([LEFT, RIGHT])
    if current_direction == LEFT:
        pos += LEFT
    else:
        pos += RIGHT
    

    Now, we can see that current_direction can be either RIGHT or LEFT. If it's LEFT, then we add LEFT to pos, otherwise (the only other choice is RIGHT), we add RIGHT to pos. In other words:

    current_direction = random.choice([LEFT, RIGHT])
    if current_direction == LEFT:
        pos += current_direction   # current_direction is LEFT
    else:
        pos += current_direction   # current_direction is RIGHT
    

    Same thing happens in both branches, so let's get rid of the condition:

    pos += random.choice([LEFT, RIGHT])
    

    Here is our new starting point:

    def run(initial_pos, iterations, trials):
        final_pos = []
        RIGHT, LEFT = 1, -1
        for i in range(0, trials):
            pos = initial_pos
            for j in range(0, iterations):
                pos += random.choice([LEFT, RIGHT])
            final_pos.append(pos)
        return Counter(final_pos)
    

    Now, if we look at the code critically (and having the low-level details of the byte-code and interpreter in mind -- the dis module helps here) we can see some other deficiencies:

    1. The call range(0, trials) is functionally identical to range(trials). However, that redundant 0 means an extra opcode to push the constant onto the stack. We don't want to waste time on useless things, right?
    2. In our inner-most loop, we're calling a function with argument [LEFT, RIGHT]. Python doesn't optimize such things, so that means we're creating the same list iterations * trials times. Even though it's just 3 opcodes, let's do it once, and then reuse the same list. We might as well make it a tuple, it doesn't need to change.
    3. The class collections.Counter supports updates. So let's avoid the intermediate final_pos list, and update the Counter directly.
    4. If we're hunting cycles, we can also note that a call to random.choice involves two opcodes (first get random, then find choice). We can cache the actual functions to call in local variables to avoid the extra step.
    def run_v1(initial_pos, iterations, trials):
        RIGHT, LEFT = 1, -1
        CHOICES = (RIGHT, LEFT)
       
        result = Counter()
        
        random_choice = random.choice
        result_update = result.update
        
        for i in range(trials):
            pos = initial_pos
            for j in range(iterations):
                pos += random_choice(CHOICES)
            result_update([pos])
    
        return result
    

    Those initial changes only mean the code runs in about 90%-95% of the time needed by the original, but they give us a solid basis for further optimizations.

    Eliminating Inner Loop — Attempt 1

    Right now our inner loop basically makes a sum of a list of random choices, offset by initial_pos. Let's use random.choices instead to generate iterations choices in one call, and the builtin sum to add them up.

    def run_v2(initial_pos, iterations, trials):
        RIGHT, LEFT = 1, -1
        CHOICES = (RIGHT, LEFT)
        
        result = Counter()
        
        random_choices = random.choices
        result_update = result.update
        
        for i in range(trials):
            result_update([initial_pos + sum(random_choices(CHOICES, k=iterations))])
    
        return result
    

    This requires roughly 25% of the time required by run_v1.

    Eliminating Inner Loop — Attempt 2

    The main problem with the previous version is that it ends up allocating a lot of intermediate Python objects (one for each choice). A more efficient way of using the memory would be to use numpy arrays and various functions the library provides. For example, we could use numpy.Generator.choice and the sum method of numpy arrays:

    def run_v3(initial_pos, iterations, trials):
        RIGHT, LEFT = 1, -1
        CHOICES = (RIGHT, LEFT)
        
        result = Counter()
        
        rng = np.random.default_rng()
        rng_choice = rng.choice
        result_update = result.update
        
        for i in range(trials):
            result_update([initial_pos + rng_choice(CHOICES, size=iterations).sum()])
            
        return result
    

    This version requires around 20% of the time required by run_v2.

    More Efficient Choices

    Currently, the random choices require an indirect lookup to map into our desired set of (1, -1). Let's observe that iterations = right_count + left_count. We already know the value of iterations, so as long as we know one of right_count or left_count, we can calculate the other.

    Therefore pos_offset = 2 * right_count - iterations, and we can implement it as such:

    def run_v4(initial_pos, iterations, trials):
        result = Counter()
        
        rng = np.random.default_rng()
        rng_choice = rng.choice
        result_update = result.update
        
        for i in range(0, trials):
            result_update([initial_pos + 2 * rng_choice(2, size=iterations).sum() - iterations])
    
        return result
    

    This time it requires about 60%-90% of run_v3.


    Further Variations

    Using np.random.default_rng().integers instead.

    def run_v5a(initial_pos, iterations, trials):
        final_pos = []
        rng = np.random.default_rng()
        rng_integers = rng.integers
        for i in range(0, trials):
            pos = initial_pos + 2 * rng_integers(2, size=iterations).sum() - iterations
            final_pos.append(pos)
        return Counter(final_pos)
    
    def run_v5b(initial_pos, iterations, trials):
        final_pos = []
        rng = np.random.default_rng()
        rng_integers = rng.integers
        for i in range(0, trials):
            pos = initial_pos + 2 * rng_integers(2, dtype=np.uint8, size=iterations).sum(dtype=np.int32) - iterations
            final_pos.append(pos)
        return Counter(final_pos)
    

    Using more memory to eliminate the upper loop.

    def run_v6a(initial_pos, iterations, trials):
        rng = np.random.default_rng()
        final_pos = initial_pos + 2 * rng.integers(2, size=(trials, iterations)).sum(axis=1) - iterations
        return Counter(final_pos)
    
    def run_v6b(initial_pos, iterations, trials):
        rng = np.random.default_rng()
        final_pos = initial_pos + 2 * rng.integers(2, dtype=np.uint8, size=(trials, iterations)).sum(axis=1,
            dtype=np.int32) - iterations
        return Counter(final_pos)
    

    As suggested by Jérôme Richard sampling binomial distribution.

    Applying math and using np.random.binomial.

    def run_v7(initial_pos, iterations, trials):
        final_pos = initial_pos + 2 * np.random.binomial(iterations, 0.5, trials) - iterations
        return Counter(final_pos)
    

    This runs in 1/4 second for 10^6 iterations and trials.