Parallel programming in Python: mpi4py (part 2)

In part 1 of this post, we introduced the mpi4py module (MPI for Python) which provides an object-oriented interface for Python resembling the message passing interface (MPI) and enables Python programs to exploit multiple processors on multiple compute nodes.

The mpi4py module provides methods for communicating various types of Python objects in different ways. In part 1 of this post we showed you how to communicate generic Python objects between MPI processes – the methods for doing this have names that are all lowercase letters. Some of these methods were introduced in part 1 of this post. It is also possible to directly send buffer-like objects, where the data is exposed in a raw format and can be accessed without copying, between MPI processes. The methods for doing this start with an uppercase letter.

In this post we continue introducing the mpi4py module, with a focus on the direct communication of buffer-like objects using the latter type of methods (that is, those starting with a capital letter), including Send, Recv, Isend, Irecv, Bcast, and Reduce, as well as Scatterv and Gatherv, which are vector variants of Scatter and Gather, respectively.

Buffer-like objects

The mpi4py module provides methods for directly sending and receiving buffer-like objects. The advantage of working with buffer-like objects is that the communication is fast (close to the speed of MPI communication in C). However, there is a disadvantage in that the user needs to be more explicit when it comes to handling the allocation of memory space. For example, the memory of the receiving buffer needs to be allocated prior to the communication, and the size of the sending buffer should not exceed that of the receiving buffer. One should also be aware that mpi4py expects the buffer-like objects to have contiguous memory. Fortunately, this is usually the case with Numpy arrays, which are probably the most commonly used buffer-like objects in scientific computing in Python.

In mpi4py, a buffer-like object can be specified using a list or tuple with 2 or 3 elements (or 4 elements for the vector variants, which will be elaborated in later sections). For example, for a Numpy array that is named “data” and that consists of double-precision numbers, we can use [data, n, MPI.DOUBLE], where n is a positive integer, to refer to the buffer of the first n elements. It is also possible to use [data, MPI.DOUBLE], or simply data, to refer to the buffer of the whole array. In the following sections, we’ll demonstrate the communication of Numpy arrays using mpi4py.

Point-to-point communication

In the previous post we have introduced point-to-point communication using all-lowercase methods (send and recv) in mpi4py. The use of methods with a leading uppercase letter (Send and Recv) is quite similar, except that the receiving buffer needs to be initialized before the Recv method is called. An example code for passing a Numpy array from the master node (which has rank = 0) to the worker processes (that all have rank > 0) is shown below.

from mpi4py import MPI
import numpy as np

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

# master process
if rank == 0:
    data = np.arange(4.)
    # master process sends data to worker processes by
    # going through the ranks of all worker processes
    for i in range(1, size):
        comm.Send(data, dest=i, tag=i)
        print('Process {} sent data:'.format(rank), data)

# worker processes
else:
    # initialize the receiving buffer
    data = np.zeros(4)
    # receive data from master process
    comm.Recv(data, source=0, tag=rank)
    print('Process {} received data:'.format(rank), data)

Note that the data array was initialized on the worker processes before the Recv method was called, and that the Recv method takes data as the first argument (in contrast to the recv method which returns the data object). The output from running the above example (send-recv.py) on 4 processes is as follows:

$ module load mpi4py/3.0.2/py37

$ salloc --nodes 1 -t 00:10:00 -A <your-project-account>
salloc: Granted job allocation <your-job-id>

$ srun -n 4 python3 send-recv.py
Process 0 sent data: [0. 1. 2. 3.]
Process 0 sent data: [0. 1. 2. 3.]
Process 0 sent data: [0. 1. 2. 3.]
Process 1 received data: [0. 1. 2. 3.]
Process 2 received data: [0. 1. 2. 3.]
Process 3 received data: [0. 1. 2. 3.]

When using the Send and Recv methods, one thing to keep in mind is that the sending buffer and the receiving buffer should match in size. An error will occur if the size of the sending buffer is greater than that of the receiving buffer. It is, however, possible to have a receiving buffer that is larger than the sending buffer. This is illustrated in the following example, where the sending buffer has 4 elements and the size of the receiving buffer is 6. The communication will complete without error, but only the first 4 elements of the receiving buffer will be overwritten (the last 2 elements will remain zero).

if rank == 0:
    data = np.arange(4.)
    for i in range(1, size):
        comm.Send(data, dest=i, tag=i)
        print('Process {} sent data:'.format(rank), data)

else:
    # note: the size of the receiving buffer is larger than
    # that of the sending buffer
    data = np.zeros(6)
    comm.Recv(data, source=0, tag=rank)
    print('Process {} has data:'.format(rank), data)

The output is as follows. Note that the last two elements of the receiving buffers are zero.

$ srun -n 4 python3 send-recv.py
Process 0 sent data: [0. 1. 2. 3.]
Process 0 sent data: [0. 1. 2. 3.]
Process 0 sent data: [0. 1. 2. 3.]
Process 1 has data: [0. 1. 2. 3. 0. 0.]
Process 2 has data: [0. 1. 2. 3. 0. 0.]
Process 3 has data: [0. 1. 2. 3. 0. 0.]

In part 1 of this post we also discussed blocking and non-blocking methods for point-to-point communication. In mpi4py, the non-blocking communication methods for buffer-like objects are Isend and Irecv. The use of non-blocking methods are shown in the example below.

if rank == 0:
    data = np.arange(4.)
    for i in range(1, size):
        req = comm.Isend(data, dest=i, tag=i)
        req.Wait()
        print('Process {} sent data:'.format(rank), data)

else:
    data = np.zeros(4)
    req = comm.Irecv(data, source=0, tag=rank)
    req.wait()
    print('Process {} received data:'.format(rank), data)

Collective communication

As we mentioned in part 1 of this post, collective communication methods are very useful in parallel programming. In the example below we use the Bcast method to broadcast a buffer-like object data from the master process to all the worker processes. Note that data needs to be initialized on the worker processes before Bcast is called.

from mpi4py import MPI
import numpy as np

comm = MPI.COMM_WORLD
rank = comm.Get_rank()

if rank == 0:
    data = np.arange(4.0)
else:
    data = np.zeros(4)

comm.Bcast(data, root=0)

print('Process {} has data:'.format(rank), data)

The output from running the above code (broadcast.py) on 4 processes is as follows:

$ srun -n 4 python3 broadcast.py
Process 0 has data: [0. 1. 2. 3.]
Process 1 has data: [0. 1. 2. 3.]
Process 2 has data: [0. 1. 2. 3.]
Process 3 has data: [0. 1. 2. 3.]

Another collective communication method is Scatter, which sends slices of a large array to the worker processes. In part 1 of this post, we showed that the all-lowercase scatter method is convenient when sending slices of Python objects. However,  Scatter is not as convenient in practice since it requires the size of the large array to be divisible by the number of processes. For instance, if we know beforehand that the array we are going to distribute has 16 elements, it will be straightforward to use Scatter to distribute the array on 4 processes in such a way that each process gets 4 elements. The problem is that, in practice, the size of the array is not known beforehand and is therefore not guaranteed to be divisible by the number of available processes. It is more practical to use Scatterv, the vector version of Scatter, which offers a much more flexible way to distribute the array. The code below distributes 15 numbers over 4 processes. Note that we use “[sendbuf, count, displ, MPI.DOUBLE]” to specify the buffer-like object, where count contains the number of elements to be sent to each process and displ contains the starting indices of the sub-tasks. (These indices are often known as displacements.)

from mpi4py import MPI
import numpy as np

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
nprocs = comm.Get_size()

if rank == 0:
    sendbuf = np.arange(15.0)

    # count: the size of each sub-task
    ave, res = divmod(sendbuf.size, nprocs)
    count = [ave + 1 if p < res else ave for p in range(nprocs)]
    count = np.array(count)

    # displacement: the starting index of each sub-task
    displ = [sum(count[:p]) for p in range(nprocs)]
    displ = np.array(displ)
else:
    sendbuf = None
    # initialize count on worker processes
    count = np.zeros(nprocs, dtype=np.int)
    displ = None

# broadcast count
comm.Bcast(count, root=0)

# initialize recvbuf on all processes
recvbuf = np.zeros(count[rank])

comm.Scatterv([sendbuf, count, displ, MPI.DOUBLE], recvbuf, root=0)

print('After Scatterv, process {} has data:'.format(rank), recvbuf)

The output from running the above code on 4 processes is as follows. Note that, as there are 15 elements in total, each of the first three processes received 4 elements, and the last process received 3 elements.

$ srun -n 4 python3 scatter-array.py
After Scatterv, process 0 has data: [0. 1. 2. 3.]
After Scatterv, process 1 has data: [4. 5. 6. 7.]
After Scatterv, process 2 has data: [8. 9. 10. 11.]
After Scatterv, process 3 has data: [12. 13. 14.]

Gatherv is the reverse operation of Scatterv. When using Gatherv, one needs to specify the receiving buffer as “[recvbuf2, count, displ, MPI.DOUBLE]”, as shown in the following code. The sendbuf2 arrays will be gathered into a large array recvbuf2 on the master process.

sendbuf2 = recvbuf
recvbuf2 = np.zeros(sum(count))
comm.Gatherv(sendbuf2, [recvbuf2, count, displ, MPI.DOUBLE], root=0)

if comm.Get_rank() == 0:
    print('After Gatherv, process 0 has data:', recvbuf2)

The output from running the Gatherv code on 4 processes is as follows.

$ srun -n 4 python3 scatter-gather.py
After Gatherv, process 0 has data: [ 0.  1.  2.  3.  4.  5.  6.  7.  8.  9. 10. 11. 12. 13. 14.]

Reduce, on the other hand, can be used to compute the sum of (or to perform another operation on) the data collected from all processes. For example, after calling Scatterv, we can compute the sum of the numbers in recvbuf on each process, and then call Reduce to add all of those partial contributions and store the result on the master process.

partial_sum = np.zeros(1)
partial_sum[0] = sum(recvbuf)
print('Partial sum on process {} is:'.format(rank), partial_sum[0])

total_sum = np.zeros(1)
comm.Reduce(partial_sum, total_sum, op=MPI.SUM, root=0)
if comm.Get_rank() == 0:
    print('After Reduce, total sum on process 0 is:', total_sum[0])

The output from running the Reduce code on 4 processes is as follows.

$ srun -n 4 python3 scatter-reduce.py
Partial sum on process 0 is: 6.0
Partial sum on process 1 is: 22.0
Partial sum on process 2 is: 38.0
Partial sum on process 3 is: 39.0
After Reduce, total sum on process 0 is: 105.0

Summary

We have shown how to directly communicate buffer-like objects using the mpi4py module and its methods that start with an uppercase letter. The communication of buffer-like objects is faster, but less flexible, than the communication of Python objects

You may read the MPI for Python tutorial page for more information about the mpi4py module.