3. Getting Started¶
This section provides a short tutorial that covers the basics of using Bodo and explains its important concepts.
To install Bodo, please follow instructions here
3.1. Parallel Pandas with Bodo¶
Bodo automatically parallelizes and optimizes standard Python programs that make use of pandas and NumPy, without the need to rewrite your code. Bodo can scale your analytics code to thousands of cores, providing orders of magnitude speed up depending on program characteristics.
Bodo uses MPI for efficient communication, which is usually much faster than alternative methods.
Note: For Jupyter Notebook, follow setup here. Then, at the top of the code cell use the %%px
magic to run on MPI engines.
3.1.1. Generate data¶
To begin, let’s create a Python script named generate_data.py with the code below to generate a simple dataset (the size of this dataframe in memory is approximately 305 MB, and the size of the written Parquet file is 77 MB):
import pandas as pd
import numpy as np
NUM_GROUPS = 30
NUM_ROWS = 20_000_000
df = pd.DataFrame({
"A": np.arange(NUM_ROWS) % NUM_GROUPS,
"B": np.arange(NUM_ROWS)
})
df.to_parquet("example1.pq")
print(df)
Run the following command from the terminal to execute the script:
python generate_data.py
Output:
A B
0 0 0
1 1 1
2 2 2
3 3 3
4 4 4
... .. ...
19999995 15 19999995
19999996 16 19999996
19999997 17 19999997
19999998 18 19999998
19999999 19 19999999
[20000000 rows x 2 columns]
To check that the file is generated
ls -lh
-rw-r--r-- 1 bodo staff 77M Feb 12 15:30 example1.pq
-rw-r--r-- 1 bodo staff 216B Feb 12 15:30 generate_data.py
3.1.2. Data Analysis¶
Now let’s read and process this dataframe. First using Python and pandas.
Create another Python script analyze_data.py with following code and run it with python
command as follows:
import pandas as pd
def test():
df = pd.read_parquet("example1.pq")
df2 = df.groupby("A").sum()
m = df2.B.max()
print(m)
test()
python analyze_data.py
Output:
6666676000003
Now let’s run it with Bodo in parallel. To do this, all that we have to do is to modify analyze_data.py file
to add import bodo
, the bodo.jit
decorator to the function, and run the
program with MPI (using mpiexec) as shown below.
import pandas as pd
import bodo
@bodo.jit
def test():
df = pd.read_parquet("example1.pq")
df2 = df.groupby("A").sum()
m = df2.B.max()
print(m)
test()
mpiexec -n 4 python analyze_data.py
Output:
6666676000003
Although the program appears to be a regular sequential Python program,
Bodo compiles and transforms the decorated code (the test
function
in this example) under the hood, so that it can run in parallel on many
cores. Each core operates on a different chunk of the data and
communicates with other cores when necessary.
3.1.3. Prints¶
Bodo prints replicated values like m
only once (on process 0
) to
avoid redundant printing, but we can use bodo.parallel_print
instead of print
to see
prints on all processes:
import pandas as pd
import bodo
@bodo.jit
def test():
df = pd.read_parquet("example1.pq")
df2 = df.groupby("A").sum()
m = df2.B.max()
bodo.parallel_print(m)
test()
Command:
mpiexec -n 4 python analyze_data.py
Output:
6666676000003
6666676000003
6666676000003
6666676000003
3.1.4. Parallel Data Read¶
Bodo can read data from storage such as Parquet files in parallel. This means that each process reads only its own chunk of data (which can be proportionally faster than sequential read). The example below demonstrates parallel read by printing data chunks on different cores. Save code in a new Python script read_data.py and run with mpiexec
import pandas as pd
import bodo
@bodo.jit
def test():
df = pd.read_parquet("example1.pq")
print(df)
test()
Command:
mpiexec -n 4 python read_data.py
Output:
A B
0 0 0
1 1 1
2 2 2
3 3 3
4 4 4
... .. ...
4999995 15 4999995
4999996 16 4999996
4999997 17 4999997
4999998 18 4999998
4999999 19 4999999
[5000000 rows x 2 columns]
A B
5000000 20 5000000
5000001 21 5000001
5000002 22 5000002
5000003 23 5000003
5000004 24 5000004
... .. ...
9999995 5 9999995
9999996 6 9999996
9999997 7 9999997
9999998 8 9999998
9999999 9 9999999
[5000000 rows x 2 columns]
A B
10000000 10 10000000
10000001 11 10000001
10000002 12 10000002
10000003 13 10000003
10000004 14 10000004
... .. ...
14999995 25 14999995
14999996 26 14999996
14999997 27 14999997
14999998 28 14999998
14999999 29 14999999
[5000000 rows x 2 columns]
A B
15000000 0 15000000
15000001 1 15000001
15000002 2 15000002
15000003 3 15000003
15000004 4 15000004
... .. ...
19999995 15 19999995
19999996 16 19999996
19999997 17 19999997
19999998 18 19999998
19999999 19 19999999
[5000000 rows x 2 columns]
Looking at column B, we can clearly see that each process has a separate chunk of the original dataframe.
3.1.5. Parallelizing Computation¶

Groupby shuffle communication pattern¶
Bodo parallelizes computation automatically by dividing the work between
cores and performing the necessary data communication. For example, the
groupby
operation in our example needs the data of each group to be
on the same processor. This requires shuffling data across multiple cores/processes.
3.1.6. Parallel Write¶
Bodo can write data to storage in parallel as well:
Save script in a file named save_data.py and run with mpiexec
import pandas as pd
import bodo
@bodo.jit
def test():
df = pd.read_parquet("example1.pq")
df2 = df.groupby("A").sum()
df2.to_parquet("example1-df2.pq")
test()
Command:
mpiexec -n 4 python save_data.py
Bodo will generate a folder named example1-df2.pq
where each process write its chunk in a separate file as shown below:
ls -lh example1-df2.pq
total 32
-rw-r--r-- 1 bodo staff 1.7K Feb 15 11:57 part-00.parquet
-rw-r--r-- 1 bodo staff 1.7K Feb 15 11:57 part-01.parquet
-rw-r--r-- 1 bodo staff 1.7K Feb 15 11:57 part-02.parquet
-rw-r--r-- 1 bodo staff 1.7K Feb 15 11:57 part-03.parquet
Now, let’s save the code below in read_df2.py to read and print the results with pandas:
import pandas as pd
df = pd.read_parquet("example1-df2.pq")
print(df)
Command:
python read_df2.py
Output:
B
A
0 6666663333330
4 6666665999998
6 6666667333332
16 6666674000002
20 6666656666670
24 6666659333334
28 6666661999998
1 6666663999997
7 6666667999999
8 6666668666666
11 6666670666667
12 6666671333334
13 6666672000001
15 6666673333335
18 6666675333336
5 6666666666665
19 6666676000003
21 6666657333336
22 6666658000002
23 6666658666668
29 6666662666664
2 6666664666664
3 6666665333331
9 6666669333333
10 6666670000000
14 6666672666668
17 6666674666669
25 6666660000000
26 6666660666666
27 6666661333332
The order of the groupby
results generated by Bodo can differ from
pandas since Bodo doesn’t automatically sort the output distributed data
(it is expensive and not necessary in many cases). Users can explicitly
sort dataframes at any point if desired.
3.1.7. Specifying Data Distribution¶
Bodo automatically distributes data and computation in Bodo functions by analyzing them for parallelization. However, Bodo does not know how input parameters of Bodo functions are distributed, and similarly how the user wants to handle return values. As such, Bodo assumes that input parameters and return values are replicated by default, meaning that every process receives the same input data and returns the same output, as opposed to different data chunks.
Warning
The distribution scheme of input parameters and return values determines the distribution scheme for variables inside the Bodo function that depend on them.
To illustrate this effect, let’s return the groupby
output from the
Bodo function:
import pandas as pd
import bodo
@bodo.jit
def test():
df = pd.read_parquet("example1.pq")
df2 = df.groupby("A").sum()
return df2
df2 = test()
print(df2)
Command:
mpiexec -n 4 python groupby_data.py
Output:
B
A
0 6666663333330
1 6666663999997
2 6666664666664
3 6666665333331
4 6666665999998
5 6666666666665
6 6666667333332
7 6666667999999
8 6666668666666
9 6666669333333
10 6666670000000
11 6666670666667
12 6666671333334
13 6666672000001
14 6666672666668
15 6666673333335
16 6666674000002
17 6666674666669
18 6666675333336
19 6666676000003
20 6666656666670
21 6666657333336
22 6666658000002
23 6666658666668
24 6666659333334
25 6666660000000
26 6666660666666
27 6666661333332
28 6666661999998
29 6666662666664
B
A
0 6666663333330
1 6666663999997
2 6666664666664
3 6666665333331
4 6666665999998
5 6666666666665
6 6666667333332
7 6666667999999
8 6666668666666
9 6666669333333
10 6666670000000
11 6666670666667
12 6666671333334
13 6666672000001
14 6666672666668
15 6666673333335
16 6666674000002
17 6666674666669
18 6666675333336
19 6666676000003
20 6666656666670
21 6666657333336
22 6666658000002
23 6666658666668
24 6666659333334
25 6666660000000
26 6666660666666
27 6666661333332
28 6666661999998
29 6666662666664
B
A
0 6666663333330
1 6666663999997
2 6666664666664
3 6666665333331
4 6666665999998
5 6666666666665
6 6666667333332
7 6666667999999
8 6666668666666
9 6666669333333
10 6666670000000
11 6666670666667
12 6666671333334
13 6666672000001
14 6666672666668
15 6666673333335
16 6666674000002
17 6666674666669
18 6666675333336
19 6666676000003
20 6666656666670
21 6666657333336
22 6666658000002
23 6666658666668
24 6666659333334
25 6666660000000
26 6666660666666
27 6666661333332
28 6666661999998
29 6666662666664
B
A
0 6666663333330
1 6666663999997
2 6666664666664
3 6666665333331
4 6666665999998
5 6666666666665
6 6666667333332
7 6666667999999
8 6666668666666
9 6666669333333
10 6666670000000
11 6666670666667
12 6666671333334
13 6666672000001
14 6666672666668
15 6666673333335
16 6666674000002
17 6666674666669
18 6666675333336
19 6666676000003
20 6666656666670
21 6666657333336
22 6666658000002
23 6666658666668
24 6666659333334
25 6666660000000
26 6666660666666
27 6666661333332
28 6666661999998
29 6666662666664
/Users/ehsan/dev/bodo/bodo/transforms/distributed_analysis.py:229: BodoWarning: No parallelism found for function 'test'. This could be due to unsupported usage. See distributed diagnostics for more information.
warnings.warn(
As we can see, df2
has the same data on every process. Furthermore,
Bodo warns that it didn’t find any parallelism inside the test
function. In this example, every process reads the whole input Parquet
file and executes the same sequential program. The reason is that Bodo
makes sure all variables dependent on df2
have the same
distribution, creating an inverse cascading effect.
3.1.8. distributed
Flag¶
The user can tell Bodo what input/output variables should be distributed
using the distributed
flag:
import pandas as pd
import bodo
@bodo.jit(distributed=["df2"])
def test():
df = pd.read_parquet("example1.pq")
df2 = df.groupby("A").sum()
return df2
df2 = test()
print(df2)
Command:
mpiexec -n 4 python groupby_data.py
Output:
B
A
0 6666663333330
4 6666665999998
6 6666667333332
16 6666674000002
20 6666656666670
24 6666659333334
28 6666661999998
B
A
1 6666663999997
7 6666667999999
8 6666668666666
11 6666670666667
12 6666671333334
13 6666672000001
15 6666673333335
18 6666675333336
B
A
5 6666666666665
19 6666676000003
21 6666657333336
22 6666658000002
23 6666658666668
29 6666662666664
B
A
2 6666664666664
3 6666665333331
9 6666669333333
10 6666670000000
14 6666672666668
17 6666674666669
25 6666660000000
26 6666660666666
27 6666661333332
In this case, the program is fully parallelized and chunks of data are returned to Python on different processes.
3.1.9. Basic benchmarking of the pandas example¶
Now let’s do some basic benchmarking to observe the effect of Bodo’s automatic parallelization. Here we are only scaling up to 4 cores, but Bodo can scale the same code to thousands of cores in a cluster.
Let’s add timers and run the code again with pandas:
import pandas as pd
import time
def test():
df = pd.read_parquet("example1.pq")
t0 = time.time()
df2 = df.groupby("A").sum()
m = df2.B.max()
print("Compute time:", time.time() - t0, "secs")
return m
result = test()
Command:
python analyze_data.py
Output:
Compute time: 0.5161728858947754 secs
Now let’s measure Bodo’s execution time.
import pandas as pd
import bodo
import time
@bodo.jit
def test():
df = pd.read_parquet("example1.pq")
t0 = time.time()
df2 = df.groupby("A").sum()
m = df2.B.max()
print("Compute time:", time.time() - t0, "secs")
return m
result = test()
Command:
mpiexec -n 4 python analyze_data.py
Output:
Compute time: 0.19900471500022832 secs
As we can see, Bodo computes results faster than pandas using parallel computation. The speedup depends on the data and program characteristics, as well as the number of cores used. Usually, we can continue scaling to many more cores as long as the data is large enough.
Note how we included timers inside the Bodo function. This avoids
measuring compilation time since Bodo compiles each jit
function the
first time it is called. Not measuring compilation time in benchmarking
is usually important since:
Compilation time is often not significant for large computations in real settings but simple benchmarks are designed to run quickly
Functions can potentially be compiled and cached ahead of execution time
Compilation happens only once but the same function may be called multiple times, leading to inconsistent measurements
3.1.10. Pandas User-Defined Functions¶
User-defined functions (UDFs) offer significant flexibility but have high overhead in Pandas. Bodo can accelerate UDFs significantly, allowing flexibility without performance overheads. Let’s modify our example to use UDFs and measure performance again:
import pandas as pd
import time
def test():
df = pd.read_parquet("example1.pq")
t0 = time.time()
df2 = df.groupby("A")["B"].agg((lambda a: (a==1).sum(), lambda a: (a==2).sum(), lambda a: (a==3).sum()))
m = df2.mean()
print("Compute time:", time.time() - t0, "secs")
return m
result = test()
Command:
python analyze_data.py
Output:
Compute time: 3.158751964569092 secs
Running this example with Bodo is significantly faster, even on a single core:
import pandas as pd
import time
import bodo
@bodo.jit
def test():
df = pd.read_parquet("data/example1.pq")
t0 = time.time()
df2 = df.groupby("A")["B"].agg((lambda a: (a==1).sum(), lambda a: (a==2).sum(), lambda a: (a==3).sum()))
m = df2.mean()
print("Compute time:", time.time() - t0, "secs")
return m
result = test()
Command:
python analyze_data.py
Output:
Compute time: 0.6601650919992608 secs
Bodo’s parallelism improves performance further:
mpiexec -n 4 python analyze_data.py
Output:
Compute time: 0.26972059500076284 secs
3.2. Memory Optimizations in Bodo¶
Bodo also improves performance by eliminating intermediate array values in computations such as expressions in Pandas and Numpy. The Monte Carlo Pi Estimation example demonstrates this effect:
import numpy as np
import time
def calc_pi(n):
t1 = time.time()
x = 2 * np.random.ranf(n) - 1
y = 2 * np.random.ranf(n) - 1
pi = 4 * np.sum(x**2 + y**2 < 1) / n
print("Execution time:", time.time()-t1, "\nresult:", pi)
calc_pi(2 * 10**8)
Command:
python pi.py
Output:
Execution time: 9.323673009872437
result: 3.14178548
Bodo is faster even on a single core since it avoids creating arrays alltogether:
import numpy as np
import time
import bodo
@bodo.jit
def calc_pi(n):
t1 = time.time()
x = 2 * np.random.ranf(n) - 1
y = 2 * np.random.ranf(n) - 1
pi = 4 * np.sum(x**2 + y**2 < 1) / n
print("Execution time:", time.time()-t1, "\nresult:", pi)
calc_pi(2 * 10**8)
Command:
python pi.py
Output:
Execution time: 2.233783724999739
result: 3.14182726
Data-parallel array computations typically scale well too:
import numpy as np
import time
import bodo
@bodo.jit
def calc_pi(n):
t1 = time.time()
x = 2 * np.random.ranf(n) - 1
y = 2 * np.random.ranf(n) - 1
pi = 4 * np.sum(x**2 + y**2 < 1) / n
print("Execution time:", time.time()-t1, "\nresult:", pi)
calc_pi(2 * 10**8)
Command:
mpiexec -n 4 python pi.py
Output:
Execution time: 0.6021775219996925
result: 3.14158038
Note: Timing reported here are from runs on a MacBook Pro with 2 GHz Quad-Core Intel Core i5 processor.
For more examples, please check our repo on GitHub
3.3. Unsupported Pandas/Python Features¶
3.3.1. Supported Pandas Operations¶
Bodo supports a large subset of Pandas APIs as listed
here. Moreover,
dataframe schemas (column names and types) should be stable in
operations. For example, key column names to group
have to be
constant for output type to be stable. This example demonstrates the
issue:
import bodo
import pandas as pd
@bodo.jit(distributed=False)
def f(a, i):
column_list = a[:i] # some computation that cannot be inferred statically
df = pd.DataFrame({"A": [1, 2, 1], "B": [4, 5, 6]})
return df.groupby(column_list).sum()
a = ["A", "B"]
i = 1
f(a, i)
---------------------------------------------------------------------------
BodoError Traceback (most recent call last)
<ipython-input-20-8ff85fad034d> in <module>
9 a = ["A", "B"]
10 i = 1
---> 11 f(a, i)
~/dev/bodo/bodo/numba_compat.py in _compile_for_args(***failed resolving arguments***)
841 del args
842 if error:
--> 843 raise error
844
845
BodoError: groupby(): 'by' parameter only supports a constant column label or column labels.
File "<ipython-input-20-8ff85fad034d>", line 7:
def f(a, i):
<source elided>
df = pd.DataFrame({"A": [1, 2, 1], "B": [4, 5, 6]})
return df.groupby(column_list).sum()
^
The code can most often be refactored to compute the key list in regular Python and pass as argument to Bodo:
import bodo
import pandas as pd
@bodo.jit(distributed=False)
def f(column_list):
df = pd.DataFrame({"A": [1, 2, 1], "B": [4, 5, 6]})
return df.groupby(column_list).sum()
a = ["A", "B"]
i = 1
column_list = a[:i]
f(column_list)
B | |
---|---|
A | |
1 | 10 |
2 | 5 |
3.3.2. Supported Python Operations¶
Bodo relies on Numba for supporting basic Python features. Therefore, Python constructs that are not supported by Numba (see Numba documentation here) should be avoided in Bodo programs. For example:
context manager:
with
(except forwith bodo.objmode
)async
featuresset
,dict
andgenerator
comprehensionsList containing values of heterogeneous type
myList = [1, 2] myList.append(“A”)
Dictionary containing values of heterogeneous type
myDict = {“A”: 1} myDict[“B”] = “C”
3.3.3. Parallel Data Structures¶
Bodo can parallelize Pandas DataFrame and Series data structures, as well as Numpy arrays. However, collections like lists, sets and dictionaries cannot be parallelized yet.