bodo.pandas.BodoSeries.map_with_state¶
BodoSeries.map_partitions_with_state(init_state_fn, func, *args, output_type=None, **kwargs) -> BodoSeries
Sometimes, there is initialization code that is so expensive to run that one would like to minimize overheads by running it just once per worker. Other APIs such as map or map_partitions are not suitable for this purpose because those would require per row initialization or per partition initialization. Likewise, map_with_state may not be appropriate if there are significant per-row overheads that can be amortized across a batch using map_partitions_with_state.
Note
Calling BodoSeries.map_partitions_with_state
will immediately execute a plan and will perform an
initialization of the state followed by running func on a small number of rows in order to
determine the output type of the series. This plan execution and initialization can be avoided
if the output_type is manually specified.
Note
If a per-worker clean-up is required then state can be an instance of a class with the __del__ method defined in which the clean-up is performed.
Parameters
-
init_state_fn : function: Initialization function. Run only once per worker.
-
func : function: Mapping correspondence. The first argument to func when called is the previously initialized state variable. The second argument is the partition to map in the form of a Pandas series.
-
args : *tuple of Any: Additional positional arguments to func.
-
output_type: {None, Pandas.series}, default None: If none, then plan is executed and sample of rows passed to func after calling init_state_fn to determine the output type. This parameter can be a Pandas series with the output dtype set in which case the plan, func, and init_state_fn are not immediately executed.
-
**kwargs : dict of string to Any: Additional keyword arguments to func.
Returns
-
BodoSeries
Example
import bodo.pandas as pd
class mystate:
def __init__(self):
self.dict = {1:7}
def init_state():
return mystate()
def per_batch(state, batch, *args, **kwargs):
def per_row(row):
return "bodo" + str(row + state.dict[1])
return batch.map(per_row)
a = pd.Series(list(range(20)))
b = a.map_partitions_with_state(init_state, per_batch, output_type=pd.Series(dtype="string[pyarrow]"))
print(b)
Output:
0 bodo7
1 bodo8
2 bodo9
3 bodo10
4 bodo11
5 bodo12
6 bodo13
7 bodo14
8 bodo15
9 bodo16
10 bodo17
11 bodo18
12 bodo19
13 bodo20
14 bodo21
15 bodo22
16 bodo23
17 bodo24
18 bodo25
19 bodo26
dtype: large_string[pyarrow]