Interactive Bodo Cluster Setup using IPyParallel¶
Bodo can be used with IPyParallel to allow interactive code execution on a local or remote cluster.
Getting started on your machine¶
Install IPyParallel, JupyterLab, and Bodo in your conda environment. Make sure you activate your conda environment first:
Start a JupyterLab server:
Start a new notebook and run the following code in a cell to start an IPyParallel cluster:
import ipyparallel as ipp
import psutil;
n = min(psutil.cpu_count(logical=False), 8)
rc = ipp.Cluster(engines='mpi', n=n).start_and_connect_sync(activate=True)
This starts a local N-core MPI cluster on your machine, where N is the
minimum of the number of cores on your machine and 8. You can now start
using the %%px
cell magic to parallelize your code execution, or use
%autopx
to run all cells on the IPyParallel cluster by default. Read
more
here.
Verifying your setup¶
Run the following code to verify that your IPyParallel cluster is set up correctly:
The correct output is:
Hello World from rank 0. Total ranks=N
Hello World from rank 1. Total ranks=N
...
Hello World from rank N-1. Total ranks=N
Where N is the minimum of the number of cores on your machine and 8.
Running on multiple hosts¶
To start an IPyParallel cluster across multiple hosts:
- Install IPyParallel and Bodo on all hosts:
Note
mamba
is a drop-in replacement for conda
that uses the same commands and configuration but is much faster.
See here for detail.
- Install JupyterLab on one of the hosts. Let's call it the controller node:
-
Set up passwordless SSH between each of these hosts (this is needed for
mpiexec
). See the section on passwordless ssh for instructions. -
The controller node must be able to connect to all engines via TCP on any port. If you have a restricted network, please refer to the IPyParallel documentation for other options such as SSH tunneling.
-
Create a hostfile that contains list of IP addresses or host names where you want to launch engines.
!!! note
Make sure your hostfile is in the following format:
```
ip_1 ip_2 ...
```
You can find more information about hostfiles
here.
It is important to note that other MPI systems and launchers (such
as QSUB/PBS) may use a different user interface for the allocation
of computational nodes.
- Create the default IPython profile on all nodes by executing the following from the controller node:
Now you can start a JupyterLab server on the controller node:
Starting an IPyParallel cluster across multiple hosts requires setting a couple of additional configuration options. Start a new notebook and run the following code in a cell:
import ipyparallel as ipp
c = ipp.Cluster(engines='mpi',
n=8, # Number of engines: Set this to the total number of physical cores in your cluster
controller_ip='*',
controller_args=["--nodb"])
c.engine_launcher_class.mpi_args = ["-f", <PATH_TO_HOSTFILE>]
rc = c.start_and_connect_sync()
view = rc.broadcast_view(block=True)
view.activate()
You have now successfully started an IPyParallel cluster across multiple hosts.
Verifying your setup¶
Run the following code to verify that your IPyParallel cluster is set up correctly:
%%px
import bodo
import socket
print(f"Hello World from rank {bodo.get_rank()} on host {socket.gethostname()}. Total ranks={bodo.get_size()}")
On a cluster with two hosts running 4 engines, the correct output is:
Hello World from rank 0 on host A. Total ranks=4
Hello World from rank 1 on host A. Total ranks=4
Hello World from rank 2 on host B. Total ranks=4
Hello World from rank 3 on host B. Total ranks=4
Running Bodo on your IPyParallel Cluster¶
You are now ready to run your Bodo code. Here is an example function with Bodo:
%%px
import bodo
@bodo.jit
def process_data(n):
df = pd.DataFrame({"A": np.arange(n), "B": np.arange(n)**2})
df["C"] = df.apply(lambda r: 2* r.A + r.B if r.A > 10 else 0, axis=1)
return df["C"].sum()
process_data(100000000)
Running from a Python Script¶
You can run code on an IPyParallel cluster from a python script instead of IPython or JupyterLab as follows:
-
Setup the cluster using the same steps as above.
-
Define the function you want to run on the cluster:
import inspect
import bodo
@bodo.jit
def process_data(n):
df = pd.DataFrame({"A": np.arange(n), "B": np.arange(n)**2})
df["C"] = df.apply(lambda r: 2* r.A + r.B if r.A > 10 else 0, axis=1)
return df["C"].sum()
process_data(100000000)
- We define a Python wrapper for
process_data
calledbodo_exec
which will be sent to the engines to compute. This wrapper will call the Bodo function on the engines, collect the result and send it back to the client.
- We can send the source code to be executed at the engines, using the
execute
method of IPyParallel'sDirectView
object. After the imports and code definitions are sent to the engines, the computation is started by actually calling theprocess_data
function (now defined on the engines) and returning the result to the client.
def main():
# remote code execution: import required modules on engines
view.execute("import pandas as pd")
view.execute("import numpy as np")
view.execute("import bodo")
view.execute("import time")
# send code of Bodo functions to engines
bodo_funcs = [process_data]
for f in bodo_funcs:
# get source code of Bodo function
f_src = inspect.getsource(f)
# execute the source code thus defining the function on engines
view.execute(f_src).get()
points = 200000000
ar = view.apply(bodo_exec, points)
result = ar.get()
print("Result is", result)
rc.close()
main()
Minimal Configuration for Small Setups¶
IPyParallel starts several processes as part of its cluster. In particular, it launches 2 processes per engine, and few other processes for the controller. See IPyParallel documentation for more details. Note that the diagram does not include the broadcast scheduler (part of the controller), which accounts for 2depth+1 - 1
processes. The default is a depth of 1
, which makes 3 processes.
To keep the number of processes started by IPyParallel clusters minimal, for small setups such as running locally on your machine, we recommend running IPyParallel with the following configuration:
c = ipp.Cluster(engines="mpi", n=4)
c.config.ControllerLauncher.controller_args = [
"--IPController.broadcast_scheduler_depth=0",
"--IPController.use_threads=True",
]
As the name suggests, this will run the schedulers in threads, hence decreasing the number of processes. The performance impact of this should be minimal for small setups.
To further reduce the number of processes, you can also use:
py
c.config.EngineLauncher.engine_args=["--IPEngine.enable_nanny=False"]
which will only use 1 process per engine instead of 2. We do not recommend this, since it impacts prompt error-reporting and ability to send signals to the engines.