Search code examples
parallel-processingmpiopenmpcythonmpi4py

Parallelize the "for loop" iterations in python using openmp


I want to parallelize the "for loop" iteration using OpenMP threads or similar techniques in python. The code is shown below. "idexs" iterates for 1024 times and all it does is just picks an index (i) and do an array access at self._storage[i] and stores all the information in data.

Is there a technique from OpenMP in python that I can use to speedup this operation?

Code:

 def _encode_sample(self, idxes):
        obses_t, actions, rewards, obses_tp1, dones = [], [], [], [], []
        for i in idxes:
            data = self._storage[i]
            obs_t, action, reward, obs_tp1, done = data
            obses_t.append(np.array(obs_t, copy=False))
            actions.append(np.array(action, copy=False))
            rewards.append(reward)
            obses_tp1.append(np.array(obs_tp1, copy=False))
            dones.append(done)
        return np.array(obses_t), np.array(actions), np.array(rewards), np.array(obses_tp1), np.array(dones)

I want the for loop to be executed like this: enter image description here

Code after using ray (answer from @cade):

import numpy as np
import random
import ray
import psutil

num_cpus = psutil.cpu_count(logical=False)
ray.init(num_cpus=1)



class ReplayBuffer(object):
    def __init__(self, size):
        """Create Prioritized Replay buffer.

        Parameters
        ----------
        size: int
            Max number of transitions to store in the buffer. When the buffer
            overflows the old memories are dropped.
        """
        self._storage = []
        self._maxsize = int(size)
        self._next_idx = 0

    def __len__(self):
        return len(self._storage)

    def clear(self):
        #self._storage = []
        self._storage = []
        self._next_idx = 0

    def add(self, obs_t, action, reward, obs_tp1, done):
        data = (obs_t, action, reward, obs_tp1, done)
        

        if self._next_idx >= len(self._storage):
            self._storage.append(data)
        else:
            self._storage[self._next_idx] = data
        self._next_idx = (self._next_idx + 1) % self._maxsize

    def _encode_sample(self, idxes):
        #ray.init()
        n = 256
        # using list comprehension
        split_idxes = [idxes[i * n:(i + 1) * n] for i in range((len(idxes) + n - 1) // n )]
        futures = []

        for subrange in split_idxes:
            futures.append(_encode_sample_helper.remote(self._storage, subrange))

        obses_t, actions, rewards, obses_tp1, dones = [], [], [], [], []
        outputs = ray.get(futures)

        for a, b, c, d, e in outputs:
            obses_t.extend(a)
            actions.extend(b)
            rewards.extend(c)
            obses_tp1.extend(d)
            dones.extend(e)

        return np.array(obses_t), np.array(actions), np.array(rewards), np.array(obses_tp1), np.array(dones)



    def make_index(self, batch_size):
        return [random.randint(0, len(self._storage) - 1) for _ in range(batch_size)]
        

    def make_latest_index(self, batch_size):
        idx = [(self._next_idx - 1 - i) % self._maxsize for i in range(batch_size)]
        np.random.shuffle(idx)
        return idx

    def sample_index(self, idxes):
        return self._encode_sample(idxes)

    def sample(self, batch_size):
        """Sample a batch of experiences.

        Parameters
        ----------
        batch_size: int
            How many transitions to sample.

        Returns
        -------
        obs_batch: np.array
            batch of observations
        act_batch: np.array
            batch of actions executed given obs_batch
        rew_batch: np.array
            rewards received as results of executing act_batch
        next_obs_batch: np.array
            next set of observations seen after executing act_batch
        done_mask: np.array
            done_mask[i] = 1 if executing act_batch[i] resulted in
            the end of an episode and 0 otherwise.
        """
        if batch_size > 0:
            idxes = self.make_index(batch_size)
        else:
            idxes = range(0, len(self._storage))
        return self._encode_sample(idxes)

    def collect(self):
        return self.sample(-1)

@ray.remote(num_cpus=1)
def _encode_sample_helper(_storage, subrange):
    obses_t, actions, rewards, obses_tp1, dones = [], [], [], [], []
    print(subrange)
    for i in subrange:
        data = _storage[i]#storage[i]
        obs_t, action, reward, obs_tp1, done = data
        obses_t.append(np.array(obs_t, copy=False))
        actions.append(np.array(action, copy=False))
        rewards.append(reward)
        obses_tp1.append(np.array(obs_tp1, copy=False))
        dones.append(done)
    return np.array(obses_t), np.array(actions), np.array(rewards), np.array(obses_tp1), np.array(dones)

Solution

  • I am answering with Ray since you allowed for "OpenMP threads or similar".

    Ray makes parallelizing python really easy -- you could do what you want with the following (assuming you have four cores on your machine):

    import ray
    
    def _encode_sample(self, idxes):
        split_idxes = np.array_split(idxes, 4)
        for subrange in split_idxes:
            futures.append(_encode_sample_helper.remote(self.storage, subrange))
    
        obses_t, actions, rewards, obses_tp1, dones = [], [], [], [], []
        outputs = ray.get(futures)
    
        for a, b, c, d, e in outputs:
            obses_t.extend(a)
            actions.extend(b)
            rewards.extend(c)
            obses_tp1.extend(d)
            dones.extend(e)
    
        return np.array(obses_t), np.array(actions), np.array(rewards), np.array(obses_tp1), np.array(dones)
    
    
    @ray.remote(num_cpus=1)
    def _encode_sample_helper(storage, subrange):
        obses_t, actions, rewards, obses_tp1, dones = [], [], [], [], []
        for i in subrange:
            data = storage[i]
            obs_t, action, reward, obs_tp1, done = data
            obses_t.append(np.array(obs_t, copy=False))
            actions.append(np.array(action, copy=False))
            rewards.append(reward)
            obses_tp1.append(np.array(obs_tp1, copy=False))
            dones.append(done)
        return np.array(obses_t), np.array(actions), np.array(rewards), np.array(obses_tp1), np.array(dones)