Skip to content

Parallel programming in Python: multiprocessing (part 2)

In the previous post we introduced the Pool class of the multiprocessing module. In this post we continue on and introduce the Process class, which makes it possible to have direct control over individual processes.

A process can be created by providing a target function and its input arguments to the Process constructor. The process can then be started with the start method and ended using the join method. Below is a very simple example that prints the square of a number.

import multiprocessing as mp

def square(x):
    print(x * x)

p = mp.Process(target=square, args=(5,))
p.start()
p.join()

Process and Queue

In practice, we often want the process and the function to return the computed result, rather than just printing the result as in the previous example. This can be achieved with help from the Queue class in the multiprocessing module. The Queue class includes the put method for depositing data and the get method for retrieving data. The code in the earlier example can be changed as follows so that it returns the result (rather than printing it). In this example, a Queue object named qout is used to save the result.

import multiprocessing as mp

def square(x, q):
    q.put(x * x)

qout = mp.Queue()
p = mp.Process(target=square, args=(5, qout))
p.start()
p.join()

result = qout.get()
print(result)

Now we can parallelize the code by creating multiple processes and running them simultaneously.

import multiprocessing as mp

def square(x, q):
    q.put(x * x)

qout = mp.Queue()
processes = [mp.Process(target=square, args=(i, qout))
             for i in range(2, 10)]

for p in processes:
    p.start()

for p in processes:
    p.join()

result = [qout.get() for p in processes]
print(result)

We would expect the output to be as follows.

[4, 9, 16, 25, 36, 49, 64, 81]

However the values in the Queue object will be stored in the order in which the individual processes finish. This means that the order of the squares in qout will not necessarily be in accordance with the ascending order of the input values to be squared. To demonstrate this, we can introduce some randomness in the execution time of the square function. This mimics what happens in realistic calculations where the computational load is not perfectly balanced amongst the processes.

from time import sleep
from random import randint

def square(x, q):
    sleep(0.01 * randint(0, 100))
    q.put(x * x)

Then, for example, we might obtain the following results, instead of the list of square numbers in ascending order.

[36, 9, 16, 81, 25, 64, 4, 49]

Bear in mind that the randomising function is only introduced in this short example to show the effect of asynchronous execution of processes. In realistic code that would perform far more calculations than in the short example, the order of the output values is almost always different from the order of the corresponding input values.

If we want the results to be output in the same order as the input values (so that in the initial short example the results would be in ascending order), we can pack each input value and the resulting value for its square in a tuple, and save the tuples in the Queue object.

def square(x, q):
    sleep(0.01 * randint(0, 100))
    q.put((x, x * x))

After the processes have been executed, the ascending order is recovered by sorting the results saved in qout.

unsorted_result = [qout.get() for p in processes]
result = [t[1] for t in sorted(unsorted_result)]
print(result)

You may have noticed that the above example is special in that the order of the processes coincides with that of the input values. However, more generally, if the input values are not in ascending order, we need to index the output values using the “process index” or “input index”, rather than the actual input value, in order to obtain the output values in the desired order. This is illustrated in the following code.

import multiprocessing as mp
from random import randint
from time import sleep

def square(i, x, q):
    sleep(0.01 * randint(0, 100)) 
    q.put((i, x * x))

input_values = [2, 4, 6, 8, 3, 5, 7, 9]
qout = mp.Queue()
processes = [mp.Process(target=square, args=(ind, val, qout))
             for ind, val in enumerate(input_values)]

for p in processes:
    p.start()

for p in processes:
    p.join()

unsorted_result = [qout.get() for p in processes]
result = [t[1] for t in sorted(unsorted_result)] 
print(result)

The output would then be as follows, where the values are in one-to-one correspondence with the input values [2, 4, 6, 8, 3, 5, 7, 9].

[4, 16, 36, 64, 9, 25, 49, 81]

Example: computing π

Now let’s revisit the example of computing π in the previous post. As mentioned before, the computed result from each process should be saved in a Queue object. We therefore need to modify the calc_partial_pi routine.

def calc_partial_pi(rank, nprocs, nsteps, dx, qout):
    partial_pi = 0.0
    for i in range(rank, nsteps, nprocs):
        x = (i + 0.5) * dx
        partial_pi += 4.0 / (1.0 + x * x)
    partial_pi *= dx
    qout.put(partial_pi)

Then we can set the number of steps and step size for numerical integration, as well as the desired number of processes. Ideally we would like to have one process per CPU core.

nsteps = 10000000
dx = 1.0 / nsteps
nprocs = mp.cpu_count()

Based on the desired number of processes, we prepare a list of input arguments and create the processes.

qout = mp.Queue()
inputs = [(rank, nprocs, nsteps, dx, qout) for rank in range(nprocs)]
processes = [mp.Process(target=calc_partial_pi, args=inp)
             for inp in inputs]

After asynchronous execution of the processes, the computed value of π is obtained.

for p in processes:
    p.start()

for p in processes:
    p.join()

result = [qout.get() for p in processes]
pi = sum(result)

For a simple example like computing π, the Process class provides very similar scaling to the Pool class introduced in the previous post.

Summary

  • The Process class makes it possible to control the processes directly.
  • The Queue class can be used to save results from the processes.
  • The processes are executed asynchronously.
  • The order of the output is not guaranteed to correspond to that of the input values.