1

I want to parallelize the "for loop" iteration using OpenMP threads or similar techniques in python. The code is shown below. "idexs" iterates for 1024 times and all it does is just picks an index (i) and do an array access at self._storage[i] and stores all the information in data.

Is there a technique from OpenMP in python that I can use to speedup this operation?

Code:

 def _encode_sample(self, idxes):
        obses_t, actions, rewards, obses_tp1, dones = [], [], [], [], []
        for i in idxes:
            data = self._storage[i]
            obs_t, action, reward, obs_tp1, done = data
            obses_t.append(np.array(obs_t, copy=False))
            actions.append(np.array(action, copy=False))
            rewards.append(reward)
            obses_tp1.append(np.array(obs_tp1, copy=False))
            dones.append(done)
        return np.array(obses_t), np.array(actions), np.array(rewards), np.array(obses_tp1), np.array(dones)

I want the for loop to be executed like this: enter image description here

Code after using ray (answer from @cade):

import numpy as np
import random
import ray
import psutil

num_cpus = psutil.cpu_count(logical=False)
ray.init(num_cpus=1)



class ReplayBuffer(object):
    def __init__(self, size):
        """Create Prioritized Replay buffer.

        Parameters
        ----------
        size: int
            Max number of transitions to store in the buffer. When the buffer
            overflows the old memories are dropped.
        """
        self._storage = []
        self._maxsize = int(size)
        self._next_idx = 0

    def __len__(self):
        return len(self._storage)

    def clear(self):
        #self._storage = []
        self._storage = []
        self._next_idx = 0

    def add(self, obs_t, action, reward, obs_tp1, done):
        data = (obs_t, action, reward, obs_tp1, done)
        

        if self._next_idx >= len(self._storage):
            self._storage.append(data)
        else:
            self._storage[self._next_idx] = data
        self._next_idx = (self._next_idx + 1) % self._maxsize

    def _encode_sample(self, idxes):
        #ray.init()
        n = 256
        # using list comprehension
        split_idxes = [idxes[i * n:(i + 1) * n] for i in range((len(idxes) + n - 1) // n )]
        futures = []

        for subrange in split_idxes:
            futures.append(_encode_sample_helper.remote(self._storage, subrange))

        obses_t, actions, rewards, obses_tp1, dones = [], [], [], [], []
        outputs = ray.get(futures)

        for a, b, c, d, e in outputs:
            obses_t.extend(a)
            actions.extend(b)
            rewards.extend(c)
            obses_tp1.extend(d)
            dones.extend(e)

        return np.array(obses_t), np.array(actions), np.array(rewards), np.array(obses_tp1), np.array(dones)



    def make_index(self, batch_size):
        return [random.randint(0, len(self._storage) - 1) for _ in range(batch_size)]
        

    def make_latest_index(self, batch_size):
        idx = [(self._next_idx - 1 - i) % self._maxsize for i in range(batch_size)]
        np.random.shuffle(idx)
        return idx

    def sample_index(self, idxes):
        return self._encode_sample(idxes)

    def sample(self, batch_size):
        """Sample a batch of experiences.

        Parameters
        ----------
        batch_size: int
            How many transitions to sample.

        Returns
        -------
        obs_batch: np.array
            batch of observations
        act_batch: np.array
            batch of actions executed given obs_batch
        rew_batch: np.array
            rewards received as results of executing act_batch
        next_obs_batch: np.array
            next set of observations seen after executing act_batch
        done_mask: np.array
            done_mask[i] = 1 if executing act_batch[i] resulted in
            the end of an episode and 0 otherwise.
        """
        if batch_size > 0:
            idxes = self.make_index(batch_size)
        else:
            idxes = range(0, len(self._storage))
        return self._encode_sample(idxes)

    def collect(self):
        return self.sample(-1)

@ray.remote(num_cpus=1)
def _encode_sample_helper(_storage, subrange):
    obses_t, actions, rewards, obses_tp1, dones = [], [], [], [], []
    print(subrange)
    for i in subrange:
        data = _storage[i]#storage[i]
        obs_t, action, reward, obs_tp1, done = data
        obses_t.append(np.array(obs_t, copy=False))
        actions.append(np.array(action, copy=False))
        rewards.append(reward)
        obses_tp1.append(np.array(obs_tp1, copy=False))
        dones.append(done)
    return np.array(obses_t), np.array(actions), np.array(rewards), np.array(obses_tp1), np.array(dones)
2
  • be careful with your tags. My answer is from the perspective of Python. The threading module runs threads concurrently, not in parallel, based on my answer. However, OpenMP is a Cython module, not a Python module. Cython is compiled and can interact with the cores of the processor. Python runs as on process, but concurrently. Sounds like Cython interacts with the processor, so I will update your tag. Commented Apr 5, 2023 at 3:13
  • Python is an interpreted programming language. Hence, Python programmers need interpreters to convert Python code into machine code. Whereas Cython is a compiled programming language. The Cython programs can be executed directly by the CPU of the underlying computer without using any interpreter. Commented Apr 5, 2023 at 3:14

2 Answers 2

1

I am answering with Ray since you allowed for "OpenMP threads or similar".

Ray makes parallelizing python really easy -- you could do what you want with the following (assuming you have four cores on your machine):

import ray

def _encode_sample(self, idxes):
    split_idxes = np.array_split(idxes, 4)
    for subrange in split_idxes:
        futures.append(_encode_sample_helper.remote(self.storage, subrange))

    obses_t, actions, rewards, obses_tp1, dones = [], [], [], [], []
    outputs = ray.get(futures)

    for a, b, c, d, e in outputs:
        obses_t.extend(a)
        actions.extend(b)
        rewards.extend(c)
        obses_tp1.extend(d)
        dones.extend(e)

    return np.array(obses_t), np.array(actions), np.array(rewards), np.array(obses_tp1), np.array(dones)


@ray.remote(num_cpus=1)
def _encode_sample_helper(storage, subrange):
    obses_t, actions, rewards, obses_tp1, dones = [], [], [], [], []
    for i in subrange:
        data = storage[i]
        obs_t, action, reward, obs_tp1, done = data
        obses_t.append(np.array(obs_t, copy=False))
        actions.append(np.array(action, copy=False))
        rewards.append(reward)
        obses_tp1.append(np.array(obs_tp1, copy=False))
        dones.append(done)
    return np.array(obses_t), np.array(actions), np.array(rewards), np.array(obses_tp1), np.array(dones)
Sign up to request clarification or add additional context in comments.

10 Comments

Hello, thanks for the help. I'm quite new to ray, but this solution looks interesting and easy. What does the num_cpus=1 indicate? I understood that we just split the array in 4 parts. Where do we specify to run the task on number of cores? Is it the num_cpus parameter? One more doubt is that, if I'm not wrong, the cores will work in parallel for 4 sets of idxes right?
Yes, num_cpus=1 is used to indicate how many cores the function needs. Four tasks run because split_idxes is len 4, so four _encode_sample_helper.remote invocations are made. If your machine has four cores, then yes they will run in parallel.
The issue comes because in your code you placed _encode_sample_helper inside the class. If you place it outside of the class it will work. The reason why is that Ray functions (called tasks) are stateless. If you need state, e.g. if you want to use a class, you need to wrap the whole class in @ray.remote. I provided the stateless version in my answer because I thought it was simpler, but you can do whichever.
This link describes Ray Actors, which are classes that can be parallelized docs.ray.io/en/latest/ray-core/actors.html
|
1

CAUTION: This answer was posted before the tag "Python" was changed to "Cython". See comment.

Be careful with your tags. My answer is from the perspective of Python. The threading module runs threads concurrently, not in parallel, based on my answer. However, OpenMP is a Cython module, not a Python module. Cython is compiled and can interact with the cores of the processor. Python runs as on process, but concurrently. Sounds like Cython interacts with the processor, so I will update your tag.


Rather than using a custom module, it's better to use a standard module. Less overhead. Use the threading module.

Below contains logic for the first 250 items. Change this logic for each set of items. The first parameter is the thread function and the second parameter is a list of arguments.

import threading

def thread_logic(start, end):
    // logic

thread1 = threading.Thread(target=thread_logic, args=(1,250,))
thread1.start()

You won't gain much here in terms of performance because there is lots of overhead when creating a new thread. Also be careful of adding classes in Python to your source code. That can also slow down performance due to overhead as well. There will be a time when the threads will out perform the overhead and you will benefit with threading. In other words, you might need more data than you have to see a benefit. I'd suggest logging the performance (cProfiling) of each function to see when that occurs. Run cProfile with threading and again without threading, and you will see if it's even worth running it.

Use method 3 in this link. The report at the end will tell you how many times a function was called and performance information (ie: average time to execute that function)

https://www.geeksforgeeks.org/profiling-in-python/

In Python, you are running these threads concurrently, not in parallel. So I don't know the maximum number of concurrent threads are possible.

In fact, a Python process cannot run threads in parallel but it can run them concurrently through context switching during I/O bound operations. This limitation is actually enforced by GIL. The Python Global Interpreter Lock (GIL) prevents threads within the same process to be executed at the same time.


EDIT:

To run processes on different cores in parallel, use the Python module named "multiprocessing".

Use All CPUs via Multiprocessing. We can use all CPU cores in our system by using process-based concurrency. This is provided in the Python standard library (you don't have to install anything) via the multiprocessing module.

https://docs.python.org/3/library/multiprocessing.html

import multiprocessing

def process_logic(start, end):
    // logic

process1 = multiprocessing.Process(target=process_logic, args=(1,250,))
process1.start()

12 Comments

Hello, thanks for the suggestion. Do you think we could do some scatter gather implementation and achieve better speedup? If so, can you point me to any sources or links to do that?
I'm sorry. I don't know what that is.
scatter/gather suggests a distributed memory environment (e.g. mpi4py) when threading suggests shared memory (e.g. data is available to all threads, so there is no need to scatter/gather)
When you say "concurrently, not parallel", what do you mean exactly? That a single thread is executed at a time anyway?
@PierU yeah, in python only one compute-bound thread can run per process. It is because of something called the GIL.
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.