What is Rmpi?

The Rmpi library provides the basic functions that enable R processes on separate machines to cooperate in a parallel calculation. It makes the core of the MPI library visible to, and usable by, R. In the more common scenarios, Rmpi is not used directly, rather it is used by the snow or parallel libraries. The examples that follow are good to test that the basic functionality of MPI is usable by R, but for the most part, you will probably only want to do so if you are seeing errors from a library that uses it.

Rmpi setup

R needs to be started as an MPI program, with mpirun because Rmpi will need to be able to query MPI about the layout of the processes on the various nodes and processors assigned to the job when it assigns workers to processors and nodes.

For this reason, we need to use an R profile that needs to be accessible to every R process at startup, and we need to make a special startup script for R. When using Rmpi you will use the command Rmpi instead of unadorned R

Running Rmpi

We will show only a very simple Rmpi example. Create the file testRmpi.R with the following lines.

# Load the R MPI package if it is not already loaded.
if (!is.loaded("mpi_initialize")) {
    library("Rmpi")
}
# Spawn N-1 workers
mpi.spawn.Rslaves(nslaves=mpi.universe.size()-1)

# The command we want to run on all the nodes/processors we have
mpi.remote.exec(paste("I am ", mpi.comm.rank(), " of ",
                       mpi.comm.size(), " on ",
                       Sys.info()
                       [c("nodename")]
                     )
               )

# Stop the worker processes
mpi.close.Rslaves()

# Close down the MPI processes and quit R
mpi.quit()

Next, create a PBS script to run it, for example, testRmpi.pbs, as follows.

#### PBS preamble
#PBS -N Rmpi_test
#PBS -M uniqname@umich.edu
#PBS -m abe

#PBS -l procs=4,tpn=1,pmem=1gb,walltime=1:00:00
#PBS -j oe
#PBS -V

#PBS -A example_flux
#PBS -l qos=flux
#PBS -q flux

#### End PBS preamble
# Include the next three lines always
if [ -e "${PBS_NODEFILE}" ] ; then
    uniq -c "$PBS_NODEFILE"
fi
# Put your job commands after this line

# cd to your execution directory and print for verification
cd $PBS_O_WORKDIR
echo "Working from $(pwd)"

# Note we need to use an option to suppress a large warning about forks
export OMPI_MCA_mpi_warn_on_fork=0

# Run this from mpirun so that R starts in an MPI environment
mpirun -np 1 Rmpi CMD BATCH --no-restore --no-save --quiet \
    testRmpi.R testRmpi.out

NOTE: We are using the line continuation character, \ so our long Rmpi command can be on more than one line for readability. Also note the --quiet option, which suppresses the R banner message about which version it is, etc.

Before we run this, we need to load the Rmpi module. The example shows removing all modules prior to loading Rmpi. This is not, strictly speaking, necessary, but you should unload any loaded openmpi modules, as Rmpi contains its own.

$ module purge
$ module load Rmpi

You can now submit the job,

$ qsub testRmpi.pbs

The R output should look somewhat like this. There are four lines that identify each “rank”, which is basically synonomous with processor. The master is rank 0, and so the numbering of the remaining processes will end at one less than the processors requested from PBS. The commands run should identify the same nodes and ranks as in the mpi.spawn.Rslaves() output. It is important to note that MPI does not guarantee the order of evaluation, so it is possible that rank 3 will report before rank 1.

> # Load the R MPI package if it is not already loaded.
> if (!is.loaded("mpi_initialize")) {
+     library("Rmpi")
+ }
> # Spawn N-1 workers
> mpi.spawn.Rslaves(nslaves=mpi.universe.size()-1)
    3 slaves are spawned successfully. 0 failed.
master (rank 0, comm 1) of size 4 is running on: nyx6259
slave1 (rank 1, comm 1) of size 4 is running on: nyx6215
slave2 (rank 2, comm 1) of size 4 is running on: nyx6225
slave3 (rank 3, comm 1) of size 4 is running on: nyx6147
>
> # The command we want to run on all the nodes/processors we have
> mpi.remote.exec(paste("I am ", mpi.comm.rank(), " of ",
+                        mpi.comm.size(), " on ",
+                        Sys.info()
+                        [c("nodename")]
+                      )
+                )
$slave1
[1] "I am  1  of  4  on  nyx6215.arc-ts.umich.edu"

$slave2
[1] "I am  2  of  4  on  nyx6225.arc-ts.umich.edu"

$slave3
[1] "I am  3  of  4  on  nyx6147.arc-ts.umich.edu"

>
> # Stop the worker processes
> mpi.close.Rslaves()
[1] 1
>
> # Close down the MPI processes and quit R
> mpi.quit()