Advanced Parallelism Topics

This section discusses parallelism topics that may be useful for performance tuning and advanced use cases.

Getting/Setting Distributed Data Directly

Distributed data is usually accessed and modified through high-level Pandas and Numpy APIs. However, Bodo allows direct access to distributed data without code modification in many cases as well. Here are the cases that are currently supported:

  1. Getting values using boolean array indexing, e.g. B = A[A > 3]. The output can be distributed, but may be imbalanced (bodo.rebalance() can be used if necessary).

  2. Getting values using a slice, e.g. B = A[::2]. The output can be distributed, but may be imbalanced (bodo.rebalance() can be used if necessary).

  3. Getting a value using a scalar index, e.g. a = A[m]. The output can be replicated.

  4. Setting values using boolean array indexing, e.g. A[A > 3] = a. Only supports setting a scalar or lower-dimension value currently.

  5. Setting values using a slice, e.g. A[::2] = a. Only supports setting a scalar or lower-dimension value currently.

  6. Setting a value using a scalar index, e.g. A[m] = a.

Concatenation Reduction

Some algorithms require generating variable-length output data per input data element. Bodo supports parallelizing this pattern, which we refer to as concatenation reduction. For example:

@bodo.jit
def impl(n):
   df = pd.DataFrame()
   for i in bodo.prange(n):
      df = df.append(pd.DataFrame({"A": np.arange(i)}))

   return df

A common use case is simulation applications that generate possible outcomes based on parameters. For example:

@bodo.jit
def impl():
   params = np.array([0.1, 0.2, 0.5, 1.0, 1.2, 1.5, ..., 100])
   params = bodo.scatterv(params)
   df = pd.DataFrame()
   for i in bodo.prange(len(params)):
      df = df.append(get_result(params[i]))

   return df

In this example, we chose to manually parallelize the parameter array for simplicity, since the workload is compute-heavy and the parameter data is relatively small.

Load Balancing Distributed Data

Some computations such as filter, join or groupby can result in imbalanced data chunks across cores for distributed data. This may result in some cores operating on nearly empty dataframes, and others on relatively large ones.

Bodo provides bodo.rebalance to allow manual load balance if necessary. For example:

@bodo.jit(distributed={"df"})
def rebalance_example(df):
    df = df[df["A"] > 3]
    df = bodo.rebalance(df)
    return df.sum()

In this case, we use bodo.rebalance to make sure the filtered dataframe has near-equal data chunk sizes across cores, which would accelerate later computations (sum in this case).

We can also use the dests keyword to specify a subset of ranks to which bodo should distribute the data from all ranks.

Example usage:

@bodo.jit(distributed={"df"})
def rebalance_example(df):
    df = df[df["A"] > 3]
    df = bodo.rebalance(df, dests=[0, 1])
    return df.sum()

Explicit Parallel Loops

Sometimes explicit parallel loops are required since a program cannot be written in terms of data-parallel operators easily. In this case, one can use Bodo’s prange in place of range to specify that a loop can be parallelized. The user is required to make sure the loop does not have cross iteration dependencies except for supported reductions.

The example below demonstrates a parallel loop with a reduction:

import bodo
from bodo import prange
import numpy as np

@bodo.jit
def prange_test(n):
    A = np.random.ranf(n)
    s = 0
    B = np.empty(n)
    for i in prange(len(A)):
        bodo.parallel_print("rank", bodo.get_rank())
        # A[i]: distributed data access with loop index
        # s: a supported sum reduction
        s += A[i]
        # write array with loop index
        B[i] = 2 * A[i]
    return s + B.sum()

res = prange_test(10)
print(res)
[stdout:0]
rank 0
rank 0
rank 0
13.077183553245497
[stdout:1]
rank 1
rank 1
rank 1
13.077183553245497
[stdout:2]
rank 2
rank 2
13.077183553245497
[stdout:3]
rank 3
rank 3
13.077183553245497

Currently, reductions using +=, *=, min, and max operators are supported. Iterations are simply divided between processes and executed in parallel, but reductions are handled using data exchange.

Integration with non-Bodo APIs

There are multiple methods for integration with APIs that Bodo does not support natively: 1. Switch to python object mode inside jit functions 2. Pass data in and out of jit functions

Passing Distributed Data

Bodo can receive or return chunks of distributed data to allow flexible integration with any non-Bodo Python code. The following example passes chunks of data to interpolate with Scipy, and returns interpolation results back to jit function.

import scipy.interpolate

@bodo.jit(distributed=["X", "Y", "X2"])
def dist_pass_test(n):
    X = np.arange(n)
    Y = np.exp(-X/3.0)
    X2 = np.arange(0, n, 0.5)
    return X, Y, X2

X, Y, X2 = dist_pass_test(100)
# clip potential out-of-range values
X2 = np.minimum(np.maximum(X2, X[0]), X[-1])
f = scipy.interpolate.interp1d(X, Y)
Y2 = f(X2)

@bodo.jit(distributed={"Y2"})
def dist_pass_res(Y2):
    return Y2.sum()

res = dist_pass_res(Y2)
print(res)
[stdout:0] 6.555500504321469
[stdout:1] 6.555500504321469
[stdout:2] 6.555500504321469
[stdout:3] 6.555500504321469

Collections of Distributed Data

List and dictionary collections can be used to hold distributed data structures:

@bodo.jit(distributed=["df"])
def f():
    to_concat = []
    for i in range(10):
        to_concat.append(pd.DataFrame({'A': np.arange(100), 'B': np.random.random(100)}))
        df = pd.concat(to_concat)
    return df

f()
A B
0 0 0.518256
1 1 0.996147
2 2 0.881703
3 3 0.821504
4 4 0.311216
... ... ...
20 20 0.440666
21 21 0.142903
22 22 0.825534
23 23 0.359685
24 24 0.534700

250 rows × 2 columns

A B
25 25 0.284761
26 26 0.441711
27 27 0.468827
28 28 0.015361
29 29 0.002683
... ... ...
45 45 0.217445
46 46 0.372188
47 47 0.737716
48 48 0.168481
49 49 0.757296

250 rows × 2 columns

A B
50 50 0.430431
51 51 0.572574
52 52 0.347954
53 53 0.547276
54 54 0.558948
... ... ...
70 70 0.768203
71 71 0.106369
72 72 0.036671
73 73 0.485589
74 74 0.137820

250 rows × 2 columns

A B
75 75 0.323295
76 76 0.928662
77 77 0.769746
78 78 0.988702
79 79 0.452371
... ... ...
95 95 0.458132
96 96 0.959298
97 97 0.988239
98 98 0.797115
99 99 0.071809

250 rows × 2 columns