19. Interactive Bodo Cluster Setup using Ipyparallel

Bodo can be used with ipyparallel to allow interactive code execution on a remote cluster. The code can be sent from any client (for example an application or interactive console) and the client can reside on a different host than the cluster. An example use case is web applications (such as visualization) that require real-time processing of large data.

An ipyparallel cluster consists of one controller and multiple engines. The engines are the workers that receive code from the controller and execute it in parallel using Bodo. You can have engines running simultaneously on multiple hosts to scale workloads to hundreds and thousands of cores.

In this guide we will explain how to set up an ipyparallel cluster with MPI and run Bodo code on it dynamically.

19.1. Prerequisites

  • Install Bodo on every host (see Installation)

  • Install ipyparallel in your Bodo environment:

    conda install ipyparallel -c conda-forge
    
  • Create an MPI profile for IPython:

    ipython profile create --parallel --profile=mpi
    

19.2. Starting ipyparallel cluster

There are various ways to start an ipyparallel cluster. For a detailed guide please refer here.

In this section we will describe two methods. The first is a simple way to start and use a cluster on your local host. The other is a more general approach that works in many scenarios.

19.2.1. Controller and engines on the same host

Run this command to start a cluster on the same host:

ipcluster start -n N --profile=mpi

# N: total number of engines to launch in your cluster

19.2.2. Multiple hosts

Although ipcluster can automate most of the process, it may not be directly applicable in all circumstances. If you want to start a remote cluster with ipcluster please refer to the ipyparallel documentation. Here we describe a manual approach that works in a wide range of scenarios:

  • If you have engines on different hosts you have to set up password-less SSH between each of these hosts (this is needed for mpiexec). Configuring this is outside the scope of this guide.

  • The controller must be able to connect to all engines via TCP on any port. If you have a restricted network please refer to the ipyparallel documentation for other options like SSH tunneling.

  • Start the controller (the controller can run on a different host than the engines):

    ipcontroller --profile mpi --ip '*'
    
  • Copy the ipcontroller-engine.json generated by the controller to each host that will run engines. This file is located in ~/.ipython/profile_mpi/security/ipcontroller-engine.json (copy to the same directory on the engine hosts).

  • Start the engines with mpiexec (run this command on one of the hosts in your cluster):

    mpiexec -n N -machinefile machinefile python -m ipyparallel.engine --mpi --profile-dir ~/.ipython/profile_mpi --cluster-id ''
    
    # N: total number of engines to launch in your cluster
    # machinefile: constains list of IP addresses or host names where you want to launch engines.
    

Note

Make sure your machinefile is in the following format:

ip1:N1
ip2:N2
...

where Ni is the number of engines that you want to run on host i. This is necessary for mpiexec to map MPI ranks to hosts in consecutive order.

19.3. Verify setup

You can use this simple test to verify that your cluster is set up correctly:

import ipyparallel as ipp

def test_MPI():
    comm = MPI.COMM_WORLD
    return "Hello World from rank {} on host {}. total ranks={}".format(comm.Get_rank(), MPI.Get_processor_name(), comm.Get_size())

def main():
    client = ipp.Client(profile='mpi')
    dview = client[:]
    dview.execute("from mpi4py import MPI")
    ar = dview.apply(test_MPI)
    result = ar.get()
    for out in result:
        print(out)
    client.close()

main()

On a cluster with two hosts running 4 engines, the correct output is:

Hello World from rank 0 on host A. total ranks=4
Hello World from rank 1 on host A. total ranks=4
Hello World from rank 2 on host B. total ranks=4
Hello World from rank 3 on host B. total ranks=4

19.4. Bodo example

Before running Bodo code on your cluster it is highly recommended that you verify that the cluster is running correctly and MPI-enabled. Please refer to the section above for how to do this.

Now let’s run some Bodo code on the cluster. The following will run the calculate Pi example in parallel on the cluster:

import ipyparallel as ipp
import inspect
import bodo

@bodo.jit
def calc_pi(n):
    t1 = time.time()
    x = 2 * np.random.ranf(n) - 1
    y = 2 * np.random.ranf(n) - 1
    pi = 4 * np.sum(x ** 2 + y ** 2 < 1) / n
    print("Execution time:", time.time() - t1, "\nresult:", pi)
    return pi

def bodo_exec(points):
    return calc_pi(points)

def main():
    client = ipp.Client(profile='mpi')
    dview = client[:]

    # remote code execution: import required modules on engines
    dview.execute("import numpy as np")
    dview.execute("import bodo")
    dview.execute("import time")

    # send code of Bodo functions to engines
    bodo_funcs = [calc_pi]
    for f in bodo_funcs:
        # get source code of Bodo function
        f_src = inspect.getsource(f)
        # execute the source code thus defining the function on engines
        dview.execute(f_src).get()

    points = 200000000
    ar = dview.apply(bodo_exec, points)
    result = ar.get()
    print("Result is", result)

    client.close()

main()

The first step consists of sending the import and code definitions to the engines. Note that no computation will happen yet. In this example this is done by using the execute method of ipyparallel’s DirectView object. This sends source code to be executed at the engines.

The final step is to start computation by actually calling the calc_pi function (now defined on the engines) and returning the result to the client. We define a Python wrapper called bodo_exec which will be sent to the engines as part of the dview.apply call. This wrapper will call the Bodo function on the engines, collect the result and send it back to the client.