Skip to content

Interactive Bodo Cluster Setup using IPyParallel

Bodo can be used with IPyParallel to allow interactive code execution on a local or remote cluster.

Getting started on your machine

Install IPyParallel, JupyterLab, and Bodo in your conda environment:

conda install bodo ipyparallel=8.1 jupyterlab=3 -c bodo.ai -c conda-forge

Start a JupyterLab server:

jupyter lab

Start a new notebook and run the following code in a cell to start an IPyParallel cluster:

import ipyparallel as ipp
import psutil;

n = min(psutil.cpu_count(logical=False), 8)
rc = ipp.Cluster(engines='mpi', n=n).start_and_connect_sync(activate=True)

This starts a local N-core MPI cluster on your machine, where N is the minimum of the number of cores on your machine and 8. You can now start using the %%px cell magic to parallelize your code execution, or use %autopx to run all cells on the IPyParallel cluster by default. Read more here.

Verifying your setup

Run the following code to verify that your IPyParallel cluster is set up correctly:

%%px
import bodo
print(f"Hello World from rank {bodo.get_rank()}. Total ranks={bodo.get_size()}")

The correct output is:

Hello World from rank 0. Total ranks=N
Hello World from rank 1. Total ranks=N
...
Hello World from rank N-1. Total ranks=N

Where N is the minimum of the number of cores on your machine and 8.

Running on multiple hosts

To start an IPyParallel cluster across multiple hosts:

  • Install IPyParallel and Bodo on all hosts:
conda install bodo ipyparallel=8.1 -c bodo.ai -c conda-forge
  • Install JupyterLab on one of the hosts. Let's call it the controller node:
conda install jupyterlab=3 -c bodo.ai -c conda-forge
  • Set up passwordless SSH between each of these hosts (this is needed for mpiexec). See the section on passwordless ssh for instructions.

  • The controller node must be able to connect to all engines via TCP on any port. If you have a restricted network, please refer to the IPyParallel documentation for other options such as SSH tunneling.

  • Create a hostfile that contains list of IP addresses or host names where you want to launch engines.

!!! note

  Make sure your hostfile is in the following format:
  ```
  ip_1 ip_2 ...
  ```

You can find more information about hostfiles here. It is important to note that other MPI systems and launchers (such as QSUB/PBS) may use a different user interface for the allocation of computational nodes.

  • Create the default IPython profile on all nodes by executing the following from the controller node:
    mpiexec -ppn 1 -f <PATH_TO_HOSTFILE> ipython profile create
    

Now you can start a JupyterLab server on the controller node:

jupyter lab

Starting an IPyParallel cluster across multiple hosts requires setting a couple of additional configuration options. Start a new notebook and run the following code in a cell:

import ipyparallel as ipp
c = ipp.Cluster(engines='mpi',
                n=8,  # Number of engines: Set this to the total number of physical cores in your cluster
                controller_ip='*',
                controller_args=["--nodb"])
c.engine_launcher_class.mpi_args = ["-f", <PATH_TO_HOSTFILE>]
rc = c.start_and_connect_sync()
view = rc.broadcast_view(block=True)
view.activate()

You have now successfully started an IPyParallel cluster across multiple hosts.

Verifying your setup

Run the following code to verify that your IPyParallel cluster is set up correctly:

%%px
import bodo
import socket
print(f"Hello World from rank {bodo.get_rank()} on host {socket.gethostname()}. Total ranks={bodo.get_size()}")

On a cluster with two hosts running 4 engines, the correct output is:

Hello World from rank 0 on host A. Total ranks=4
Hello World from rank 1 on host A. Total ranks=4
Hello World from rank 2 on host B. Total ranks=4
Hello World from rank 3 on host B. Total ranks=4

Running Bodo on your IPyParallel Cluster

You are now ready to run your Bodo code. Here is an example function with Bodo:

%%px
import bodo

@bodo.jit
def process_data(n):
    df = pd.DataFrame({"A": np.arange(n), "B": np.arange(n)**2})
    df["C"] = df.apply(lambda r: 2* r.A + r.B if r.A > 10 else 0, axis=1)
    return df["C"].sum()

process_data(100000000)

Running from a Python Script

You can run code on an IPyParallel cluster from a python script instead of IPython or JupyterLab as follows:

  • Setup the cluster using the same steps as above.

  • Define the function you want to run on the cluster:

import inspect
import bodo

@bodo.jit
def process_data(n):
    df = pd.DataFrame({"A": np.arange(n), "B": np.arange(n)**2})
    df["C"] = df.apply(lambda r: 2* r.A + r.B if r.A > 10 else 0, axis=1)
    return df["C"].sum()

process_data(100000000)
  • We define a Python wrapper for process_data called bodo_exec which will be sent to the engines to compute. This wrapper will call the Bodo function on the engines, collect the result and send it back to the client.
def bodo_exec(points):
    return process_data(points)
  • We can send the source code to be executed at the engines, using the execute method of IPyParallel's DirectView object. After the imports and code definitions are sent to the engines, the computation is started by actually calling the process_data function (now defined on the engines) and returning the result to the client.
def main():

    # remote code execution: import required modules on engines
    view.execute("import pandas as pd")
    view.execute("import numpy as np")
    view.execute("import bodo")
    view.execute("import time")

    # send code of Bodo functions to engines
    bodo_funcs = [process_data]
    for f in bodo_funcs:
        # get source code of Bodo function
        f_src = inspect.getsource(f)
        # execute the source code thus defining the function on engines
        view.execute(f_src).get()

    points = 200000000
    ar = view.apply(bodo_exec, points)
    result = ar.get()
    print("Result is", result)

    rc.close()

main()

Minimal Configuration for Small Setups

IPyParallel starts several processes as part of its cluster. In particular, it launches 2 processes per engine, and few other processes for the controller. See IPyParallel documentation for more details. Note that the diagram does not include the broadcast scheduler (part of the controller), which accounts for 2depth+1 - 1 processes. The default is a depth of 1, which makes 3 processes.

To keep the number of processes started by IPyParallel clusters minimal, for small setups such as running locally on your machine, we recommend running IPyParallel with the following configuration:

c = ipp.Cluster(engines="mpi", n=4)
c.config.ControllerLauncher.controller_args = [
    "--IPController.broadcast_scheduler_depth=0",
    "--IPController.use_threads=True",
]

As the name suggests, this will run the schedulers in threads, hence decreasing the number of processes. The performance impact of this should be minimal for small setups. To further reduce the number of processes, you can also use: py c.config.EngineLauncher.engine_args=["--IPEngine.enable_nanny=False"] which will only use 1 process per engine instead of 2. We do not recommend this, since it impacts prompt error-reporting and ability to send signals to the engines.

Useful IPyParallel References