In the previous post we introduced the
Pool class of the
multiprocessing module. In this post we continue on and introduce the
Process class, which makes it possible to have direct control over individual processes.
A process can be created by providing a target function and its input arguments to the
Process constructor. The process can then be started with the
start method and ended using the
join method. Below is a very simple example that prints the square of a number.
import multiprocessing as mp def square(x): print(x * x) p = mp.Process(target=square, args=(5,)) p.start() p.join()
Process and Queue
In practice, we often want the process and the function to return the computed result, rather than just printing the result as in the previous example. This can be achieved with help from the
Queue class in the
multiprocessing module. The
Queue class includes the
put method for depositing data and the
get method for retrieving data. The code in the earlier example can be changed as follows so that it returns the result (rather than printing it). In this example, a
Queue object named
qout is used to save the result.
import multiprocessing as mp def square(x, q): q.put(x * x) qout = mp.Queue() p = mp.Process(target=square, args=(5, qout)) p.start() p.join() result = qout.get() print(result)
Now we can parallelize the code by creating multiple processes and running them simultaneously.
import multiprocessing as mp def square(x, q): q.put(x * x) qout = mp.Queue() processes = [mp.Process(target=square, args=(i, qout)) for i in range(2, 10)] for p in processes: p.start() for p in processes: p.join() result = [qout.get() for p in processes] print(result)
We would expect the output to be as follows.
[4, 9, 16, 25, 36, 49, 64, 81]
However the values in the
Queue object will be stored in the order in which the individual processes finish. This means that the order of the squares in
qout will not necessarily be in accordance with the ascending order of the input values to be squared. To demonstrate this, we can introduce some randomness in the execution time of the square function. This mimics what happens in realistic calculations where the computational load is not perfectly balanced amongst the processes.
from time import sleep from random import randint def square(x, q): sleep(0.01 * randint(0, 100)) q.put(x * x)
Then, for example, we might obtain the following results, instead of the list of square numbers in ascending order.
[36, 9, 16, 81, 25, 64, 4, 49]
Bear in mind that the randomising function is only introduced in this short example to show the effect of asynchronous execution of processes. In realistic code that would perform far more calculations than in the short example, the order of the output values is almost always different from the order of the corresponding input values.
If we want the results to be output in the same order as the input values (so that in the initial short example the results would be in ascending order), we can pack each input value and the resulting value for its square in a tuple, and save the tuples in the
def square(x, q): sleep(0.01 * randint(0, 100)) q.put((x, x * x))
After the processes have been executed, the ascending order is recovered by sorting the results saved in
unsorted_result = [qout.get() for p in processes] result = [t for t in sorted(unsorted_result)] print(result)
You may have noticed that the above example is special in that the order of the processes coincides with that of the input values. However, more generally, if the input values are not in ascending order, we need to index the output values using the “process index” or “input index”, rather than the actual input value, in order to obtain the output values in the desired order. This is illustrated in the following code.
import multiprocessing as mp from random import randint from time import sleep def square(i, x, q): sleep(0.01 * randint(0, 100)) q.put((i, x * x)) input_values = [2, 4, 6, 8, 3, 5, 7, 9] qout = mp.Queue() processes = [mp.Process(target=square, args=(ind, val, qout)) for ind, val in enumerate(input_values)] for p in processes: p.start() for p in processes: p.join() unsorted_result = [qout.get() for p in processes] result = [t for t in sorted(unsorted_result)] print(result)
The output would then be as follows, where the values are in one-to-one correspondence with the input values
[2, 4, 6, 8, 3, 5, 7, 9].
[4, 16, 36, 64, 9, 25, 49, 81]
Example: computing π
Now let’s revisit the example of computing π in the previous post. As mentioned before, the computed result from each process should be saved in a
Queue object. We therefore need to modify the
def calc_partial_pi(rank, nprocs, nsteps, dx, qout): partial_pi = 0.0 for i in range(rank, nsteps, nprocs): x = (i + 0.5) * dx partial_pi += 4.0 / (1.0 + x * x) partial_pi *= dx qout.put(partial_pi)
Then we can set the number of steps and step size for numerical integration, as well as the desired number of processes. Ideally we would like to have one process per CPU core.
nsteps = 10000000 dx = 1.0 / nsteps nprocs = mp.cpu_count()
Based on the desired number of processes, we prepare a list of input arguments and create the processes.
qout = mp.Queue() inputs = [(rank, nprocs, nsteps, dx, qout) for rank in range(nprocs)] processes = [mp.Process(target=calc_partial_pi, args=inp) for inp in inputs]
After asynchronous execution of the processes, the computed value of π is obtained.
for p in processes: p.start() for p in processes: p.join() result = [qout.get() for p in processes] pi = sum(result)
For a simple example like computing π, the
Process class provides very similar scaling to the
Pool class introduced in the previous post.
Processclass makes it possible to control the processes directly.
Queueclass can be used to save results from the processes.
- The processes are executed asynchronously.
- The order of the output is not guaranteed to correspond to that of the input values.