parallel processing – How do I parallelize a simple Python loop?

The Question :

309 people think this question is useful

This is probably a trivial question, but how do I parallelize the following loop in python?

# setup output lists
output1 = list()
output2 = list()
output3 = list()

for j in range(0, 10):
    # calc individual parameter value
    parameter = j * offset
    # call the calculation
    out1, out2, out3 = calc_stuff(parameter = parameter)

    # put results into correct output list
    output1.append(out1)
    output2.append(out2)
    output3.append(out3)

I know how to start single threads in Python but I don’t know how to “collect” the results.

Multiple processes would be fine too – whatever is easiest for this case. I’m using currently Linux but the code should run on Windows and Mac as-well.

What’s the easiest way to parallelize this code?

The Question Comments :
  • One very easy solution to parallelize a for loop is not yet mentioned as an answer – this would be by simply decorating two functions by using the deco package

The Answer 1

214 people think this answer is useful

Using multiple threads on CPython won’t give you better performance for pure-Python code due to the global interpreter lock (GIL). I suggest using the multiprocessing module instead:

pool = multiprocessing.Pool(4)
out1, out2, out3 = zip(*pool.map(calc_stuff, range(0, 10 * offset, offset)))

Note that this won’t work in the interactive interpreter.

To avoid the usual FUD around the GIL: There wouldn’t be any advantage to using threads for this example anyway. You want to use processes here, not threads, because they avoid a whole bunch of problems.

The Answer 2

77 people think this answer is useful

To parallelize a simple for loop, joblib brings a lot of value to raw use of multiprocessing. Not only the short syntax, but also things like transparent bunching of iterations when they are very fast (to remove the overhead) or capturing of the traceback of the child process, to have better error reporting.

Disclaimer: I am the original author of joblib.

The Answer 3

64 people think this answer is useful
from joblib import Parallel, delayed
import multiprocessing

inputs = range(10) 
def processInput(i):
    return i * i

num_cores = multiprocessing.cpu_count()

results = Parallel(n_jobs=num_cores)(delayed(processInput)(i) for i in inputs)
print(results)

The above works beautifully on my machine (Ubuntu, package joblib was pre-installed, but can be installed via pip install joblib).

Taken from https://blog.dominodatalab.com/simple-parallelization/

The Answer 4

61 people think this answer is useful

What’s the easiest way to parallelize this code?

I really like concurrent.futures for this, available in Python3 since version 3.2 – and via backport to 2.6 and 2.7 on PyPi.

You can use threads or processes and use the exact same interface.

Multiprocessing

Put this in a file – futuretest.py:

import concurrent.futures
import time, random               # add some random sleep time

offset = 2                        # you don't supply these so
def calc_stuff(parameter=None):   # these are examples.
    sleep_time = random.choice([0, 1, 2, 3, 4, 5])
    time.sleep(sleep_time)
    return parameter / 2, sleep_time, parameter * parameter

def procedure(j):                 # just factoring out the
    parameter = j * offset        # procedure
    # call the calculation
    return calc_stuff(parameter=parameter)

def main():
    output1 = list()
    output2 = list()
    output3 = list()
    start = time.time()           # let's see how long this takes

    # we can swap out ProcessPoolExecutor for ThreadPoolExecutor
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for out1, out2, out3 in executor.map(procedure, range(0, 10)):
            # put results into correct output list
            output1.append(out1)
            output2.append(out2)
            output3.append(out3)
    finish = time.time()
    # these kinds of format strings are only available on Python 3.6:
    # time to upgrade!
    print(f'original inputs: {repr(output1)}')
    print(f'total time to execute {sum(output2)} = sum({repr(output2)})')
    print(f'time saved by parallelizing: {sum(output2) - (finish-start)}')
    print(f'returned in order given: {repr(output3)}')

if __name__ == '__main__':
    main()

And here’s the output:

$ python3 -m futuretest
original inputs: [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
total time to execute 33 = sum([0, 3, 3, 4, 3, 5, 1, 5, 5, 4])
time saved by parallellizing: 27.68999981880188
returned in order given: [0, 4, 16, 36, 64, 100, 144, 196, 256, 324]

Multithreading

Now change ProcessPoolExecutor to ThreadPoolExecutor, and run the module again:

$ python3 -m futuretest
original inputs: [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
total time to execute 19 = sum([0, 2, 3, 5, 2, 0, 0, 3, 3, 1])
time saved by parallellizing: 13.992000102996826
returned in order given: [0, 4, 16, 36, 64, 100, 144, 196, 256, 324]

Now you have done both multithreading and multiprocessing!

Note on performance and using both together.

Sampling is far too small to compare the results.

However, I suspect that multithreading will be faster than multiprocessing in general, especially on Windows, since Windows doesn’t support forking so each new process has to take time to launch. On Linux or Mac they’ll probably be closer.

You can nest multiple threads inside multiple processes, but it’s recommended to not use multiple threads to spin off multiple processes.

The Answer 5

24 people think this answer is useful

This is the easiest way to do it!

You can use asyncio. (Documentation can be found here). It is used as a foundation for multiple Python asynchronous frameworks that provide high-performance network and web-servers, database connection libraries, distributed task queues, etc. Plus it has both high-level and low-level APIs to accomodate any kind of problem.

import asyncio

def background(f):
    def wrapped(*args, **kwargs):
        return asyncio.get_event_loop().run_in_executor(None, f, *args, **kwargs)

    return wrapped

@background
def your_function(argument):
    #code

Now this function will be run in parallel whenever called without putting main program into wait state. You can use it to parallelize for loop as well. When called for a for loop, though loop is sequential but every iteration runs in parallel to the main program as soon as interpreter gets there. For instance:

@background
def your_function(argument):
    time.sleep(5)
    print('function finished for '+str(argument))


for i in range(10):
    your_function(i)


print('loop finished')

This produces following output:

loop finished
function finished for 4
function finished for 8
function finished for 0
function finished for 3
function finished for 6
function finished for 2
function finished for 5
function finished for 7
function finished for 9
function finished for 1

The Answer 6

18 people think this answer is useful

There are a number of advantages to using Ray:

  • You can parallelize over multiple machines in addition to multiple cores (with the same code).
  • Efficient handling of numerical data through shared memory (and zero-copy serialization).
  • High task throughput with distributed scheduling.
  • Fault tolerance.

In your case, you could start Ray and define a remote function

import ray

ray.init()

@ray.remote(num_return_vals=3)
def calc_stuff(parameter=None):
    # Do something.
    return 1, 2, 3

and then invoke it in parallel

output1, output2, output3 = [], [], []

# Launch the tasks.
for j in range(10):
    id1, id2, id3 = calc_stuff.remote(parameter=j)
    output1.append(id1)
    output2.append(id2)
    output3.append(id3)

# Block until the results have finished and get the results.
output1 = ray.get(output1)
output2 = ray.get(output2)
output3 = ray.get(output3)

To run the same example on a cluster, the only line that would change would be the call to ray.init(). The relevant documentation can be found here.

Note that I’m helping to develop Ray.

The Answer 7

8 people think this answer is useful

I found joblib is very useful with me. Please see following example:

from joblib import Parallel, delayed
def yourfunction(k):   
    s=3.14*k*k
    print "Area of a circle with a radius ", k, " is:", s

element_run = Parallel(n_jobs=-1)(delayed(yourfunction)(k) for k in range(1,10))

n_jobs=-1: use all available cores

The Answer 8

7 people think this answer is useful

why dont you use threads, and one mutex to protect one global list?

import os
import re
import time
import sys
import thread

from threading import Thread

class thread_it(Thread):
    def __init__ (self,param):
        Thread.__init__(self)
        self.param = param
    def run(self):
        mutex.acquire()
        output.append(calc_stuff(self.param))
        mutex.release()   


threads = []
output = []
mutex = thread.allocate_lock()

for j in range(0, 10):
    current = thread_it(j * offset)
    threads.append(current)
    current.start()

for t in threads:
    t.join()

#here you have output list filled with data

keep in mind, you will be as fast as your slowest thread

The Answer 9

3 people think this answer is useful

Let’s say we have an async function

async def work_async(self, student_name: str, code: str, loop):
"""
Some async function
"""
    # Do some async procesing    

That needs to be run on a large array. Some attributes are being passed to the program and some are used from property of dictionary element in the array.

async def process_students(self, student_name: str, loop):
    market = sys.argv[2]
    subjects = [...] #Some large array
    batchsize = 5
    for i in range(0, len(subjects), batchsize):
        batch = subjects[i:i+batchsize]
        await asyncio.gather(*(self.work_async(student_name,
                                           sub['Code'],
                                           loop)
                       for sub in batch))

The Answer 10

2 people think this answer is useful

Have a look at this;

http://docs.python.org/library/queue.html

This might not be the right way to do it, but I’d do something like;

Actual code;

from multiprocessing import Process, JoinableQueue as Queue 

class CustomWorker(Process):
    def __init__(self,workQueue, out1,out2,out3):
        Process.__init__(self)
        self.input=workQueue
        self.out1=out1
        self.out2=out2
        self.out3=out3
    def run(self):
            while True:
                try:
                    value = self.input.get()
                    #value modifier
                    temp1,temp2,temp3 = self.calc_stuff(value)
                    self.out1.put(temp1)
                    self.out2.put(temp2)
                    self.out3.put(temp3)
                    self.input.task_done()
                except Queue.Empty:
                    return
                   #Catch things better here
    def calc_stuff(self,param):
        out1 = param * 2
        out2 = param * 4
        out3 = param * 8
        return out1,out2,out3
def Main():
    inputQueue = Queue()
    for i in range(10):
        inputQueue.put(i)
    out1 = Queue()
    out2 = Queue()
    out3 = Queue()
    processes = []
    for x in range(2):
          p = CustomWorker(inputQueue,out1,out2,out3)
          p.daemon = True
          p.start()
          processes.append(p)
    inputQueue.join()
    while(not out1.empty()):
        print out1.get()
        print out2.get()
        print out3.get()
if __name__ == '__main__':
    Main()

Hope that helps.

The Answer 11

2 people think this answer is useful

This could be useful when implementing multiprocessing and parallel/ distributed computing in Python.

YouTube tutorial on using techila package

Techila is a distributed computing middleware, which integrates directly with Python using the techila package. The peach function in the package can be useful in parallelizing loop structures. (Following code snippet is from the Techila Community Forums)

techila.peach(funcname = 'theheavyalgorithm', # Function that will be called on the compute nodes/ Workers
    files = 'theheavyalgorithm.py', # Python-file that will be sourced on Workers
    jobs = jobcount # Number of Jobs in the Project
    )

The Answer 12

2 people think this answer is useful

thanks @iuryxavier

from multiprocessing import Pool
from multiprocessing import cpu_count


def add_1(x):
    return x + 1

if __name__ == "__main__":
    pool = Pool(cpu_count())
    results = pool.map(add_1, range(10**12))
    pool.close()  # 'TERM'
    pool.join()   # 'KILL'

The Answer 13

-2 people think this answer is useful

very simple example of parallel processing is

from multiprocessing import Process

output1 = list()
output2 = list()
output3 = list()

def yourfunction():
    for j in range(0, 10):
        # calc individual parameter value
        parameter = j * offset
        # call the calculation
        out1, out2, out3 = calc_stuff(parameter=parameter)

        # put results into correct output list
        output1.append(out1)
        output2.append(out2)
        output3.append(out3)

if __name__ == '__main__':
    p = Process(target=pa.yourfunction, args=('bob',))
    p.start()
    p.join()

Add a Comment