Skip to content

DataFrame API

BodoDataFrame.apply

BodoDataFrame.apply(
        func,
        axis=0,
        raw=False,
        result_type=None,
        args=(),
        by_row="compat",
        engine="python",
        engine_kwargs=None,
        **kwargs,
    ) -> BodoSeries

Apply a function along an axis of the BodoDataFrame.

Currently only supports applying a function that returns a scalar value for each row (i.e. axis=1). All other uses will fall back to Pandas. See pandas.DataFrame.apply for more details.

Note

Calling BodoDataFrame.apply will immediately execute a plan to generate a small sample of the BodoDataFrame and then call pandas.DataFrame.apply on the sample to infer output types before proceeding with lazy evaluation.

Parameters

func : function: Function to apply to each row.

axis : {0 or 1}, default 0: The axis to apply the function over. axis=0 will fall back to pandas.DataFrame.apply.

args : tuple: Additional positional arguments to pass to func.

**kwargs: Additional keyword arguments to pass as keyword arguments to func.

All other parameters will trigger a fallback to pandas.DataFrame.apply if a non-default value is provided.

Returns

BodoSeries: The result of applying func to each row in the BodoDataFrame.

Example

import bodo.pandas as bd

bdf = bd.DataFrame(
        {
            "a": bd.array([1, 2, 3] * 4, "Int64"),
            "b": bd.array([4, 5, 6] * 4, "Int64"),
            "c": ["a", "b", "c"] * 4,
        },
    )

out_bodo = bdf.apply(lambda x: x["a"] + 1, axis=1)

print(type(out_bodo))
print(out_bodo)

Output:

<class 'bodo.pandas.series.BodoSeries'>
0     2
1     3
2     4
3     2
4     3
5     4
6     2
7     3
8     4
9     2
10    3
11    4
dtype: int64[pyarrow]


BodoDataFrame.groupby

BodoDataFrame.groupby(
    by=None,
    axis=lib.no_default,
    level=None,
    as_index=True,
    sort=False,
    group_keys=True,
    observed=lib.no_default,
    dropna=True
) -> DataFrameGroupBy

Creates a DataFrameGroupBy object representing the data in the input DataFrame grouped by a column or list of columns. The object can then be used to apply functions over groups.

Parameters

by : str | List[str]: The column or list of columns to use when creating groups.

as_index : bool, default True: Whether the grouped labels will appears as an index in the final output. If as_index is False, then the grouped labels will appear as regular columns.

dropna: bool, default True If True, rows where the group label contains a missing value will be dropped from the final output.

All other parameters will trigger a fallback to pandas.DataFrame.groupby if a non-default value is provided.

Returns

DataFrameGroupBy

Examples

import bodo.pandas as bd

bdf1 = bd.DataFrame({
    "A": ["foo", "foo", "bar", "bar"],
    "B": [1, 1, 1, None],
    "C": [1, 2, 3, 4]
})

bdf2 = bdf1.groupby(["A", "B"]).sum()
print(bdf2)
Output:
         C
A   B
bar 1.0  3
foo 1.0  3


bdf3 = bdf1.groupby(["A", "B"], as_index=False, dropna=False).sum()
print(bdf2)
Output:
     A     B  C
0  bar  <NA>  4
1  foo   1.0  3
2  bar   1.0  3


BodoDataFrame.head

BodoDataFrame.head(n=5) -> BodoDataFrame

Returns the first n rows of the BodoDataFrame.

Parameters

n : int, default 5: Number of rows to select.

Returns

BodoDataFrame

Example

import bodo.pandas as bd

original_df = bd.DataFrame(
    {"foo": range(15), "bar": range(15, 30)}
   )

original_df.to_parquet("example.pq")

restored_df = bd.read_parquet("example.pq")
restored_df_head = restored_df.head(2)
print(type(restored_df_head))
print(restored_df_head)

Output:

<class 'bodo.pandas.frame.BodoDataFrame'>
   foo  bar
0    0   15
1    1   16


BodoDataFrame.map_partitions

BodoDataFrame.map_partitions(func, *args, **kwargs) -> BodoSeries | BodoDataFrame

Apply a function to groups of rows in a DataFrame and return a DataFrame or Series of the same size.

If the input DataFrame is lazy (i.e. its plan has not been evaluated yet) and func returns a Series, then the output will be lazy as well. When the lazy output is evaluated, func will take batches of rows from the input DataFrame. In the cases where func returns a DataFrame or the input DataFrame is not lazy, each worker will call func on their entire local chunk of the input DataFrame.

Parameters

func : Callable: A function that takes in a DataFrame and returns a DataFrame or Series (with the same number of rows). Currently, functions that return a DataFrame will trigger execution even if the input DataFrame has a lazy plan.

*args: Additional positional arguments to pass to func.

**kwargs: Additional keyword arguments to pass as keyword arguments to func.

Returns

BodoSeries or BodoDataFrame: The result of applying func to the BodoDataFrame.

Example

import bodo.pandas as bd

bdf = bd.DataFrame(
    {"foo": range(15), "bar": range(15, 30)}
   )

bdf_mapped = bdf.map_partitions(lambda df_: df_.foo + df_.bar)
print(bdf_mapped)

Output:

0     15
1     17
2     19
3     21
4     23
5     25
6     27
7     29
8     31
9     33
10    35
11    37
12    39
13    41
14    43
dtype: int64[pyarrow]


Setting DataFrame Columns

Bodo DataFrames support setting columns lazily when the value is a Series created from the same DataFrame or a constant value. Other cases will fallback to Pandas.

Examples

import bodo.pandas as bd

bdf = bd.DataFrame(
        {
            "A": bd.array([1, 2, 3, 7] * 3, "Int64"),
            "B": ["A1", "B1 ", "C1", "Abc"] * 3,
            "C": bd.array([4, 5, 6, -1] * 3, "Int64"),
        }
    )

bdf["D"] = bdf["B"].str.lower()
print(type(bdf))
print(bdf.D)

Output:

<class 'bodo.pandas.frame.BodoDataFrame'>
0      a1
1     b1
2      c1
3     abc
4      a1
5     b1
6      c1
7     abc
8      a1
9     b1
10     c1
11    abc
Name: D, dtype: string

import bodo.pandas as bd

bdf = bd.DataFrame(
        {
            "A": bd.array([1, 2, 3, 7] * 3, "Int64"),
            "B": ["A1", "B1 ", "C1", "Abc"] * 3,
            "C": bd.array([4, 5, 6, -1] * 3, "Int64"),
        }
    )

bdf["D"] = 11
print(type(bdf))
print(bdf.D)

Output:

<class 'bodo.pandas.frame.BodoDataFrame'>
0     11
1     11
2     11
3     11
4     11
5     11
6     11
7     11
8     11
9     11
10    11
11    11
Name: D, dtype: int64[pyarrow]


BodoDataFrame.sort_values

BodoDataFrame.sort_values(by, *, axis=0, ascending=True, inplace=False, kind="quicksort", na_position="last", ignore_index=False, key=None)
Sorts the elements of the BodoDataFrame and returns a new sorted BodoDataFrame.

Parameters

by: str or list of str: Name or list of column names to sort by.

ascending : bool or list of bool, default True: Sort ascending vs. descending. Specify list for multiple sort orders. If this is a list of bools, must match the length of the by.

na_position: str {'first', 'last'} or list of str, default 'last': Puts NaNs at the beginning if first; last puts NaNs at the end. Specify list for multiple NaN orders by key. If this is a list of strings, must match the length of the by.

All other parameters will trigger a fallback to pandas.DataFrame.sort_values if a non-default value is provided.

Returns

BodoDataFrame

Example

import bodo.pandas as bd

bdf = bd.DataFrame(
    {
        "A": bd.array([1, 2, 3, 7] * 3, "Int64"),
        "B": ["A1", "B1", "C1", "Abc"] * 3,
        "C": bd.array([6, 5, 4] * 4, "Int64"),
    }
)

bdf_sorted = bdf.sort_values(by=["A", "C"], ascending=[False, True])
print(bdf_sorted)

Output:

    A    B  C
0   7  Abc  4
1   7  Abc  5
2   7  Abc  6
3   3   C1  4
4   3   C1  5
5   3   C1  6
6   2   B1  4
7   2   B1  5
8   2   B1  6
9   1   A1  4
10  1   A1  5
11  1   A1  6


BodoDataFrame.to_parquet

BodoDataFrame.to_parquet(path=None, engine="auto", compression="snappy", index=None, partition_cols=None, storage_options=None, row_group_size=-1, **kwargs)
Write a DataFrame as a Parquet dataset.

Parameters

path: str: Output path to write. It can be a local path (e.g. output.parquet), AWS S3 (s3://...), Azure ALDS (abfs://..., abfss://...), or GCP GCS (gcs://..., gs://).

compression : str, default 'snappy': File compression to use. Can be None, 'snappy', 'gzip', or 'brotli'.

row_group_size : int: Row group size in output Parquet files. -1 allows the backend to choose.

All other parameters will trigger a fallback to pandas.DataFrame.to_parquet.

Example

import bodo.pandas as bd

bdf = bd.DataFrame(
    {
        "A": bd.array([1, 2, 3, 7] * 3, "Int64"),
        "B": ["A1", "B1", "C1", "Abc"] * 3,
        "C": bd.array([6, 5, 4] * 4, "Int64"),
    }
)

bdf.to_parquet("output.parquet")
print(bd.read_parquet("output.parquet"))

Output:

    A    B  C
0   1   A1  6
1   2   B1  5
2   3   C1  4
3   7  Abc  6
4   1   A1  5
5   2   B1  4
6   3   C1  6
7   7  Abc  5
8   1   A1  4
9   2   B1  6
10  3   C1  5
11  7  Abc  4


BodoDataFrame.to_iceberg

BodoDataFrame.to_iceberg(
        table_identifier,
        catalog_name=None,
        *,
        catalog_properties=None,
        location=None,
        append=False,
        partition_spec=None,
        sort_order=None,
        properties=None,
        snapshot_properties=None
)
Write a DataFrame as an Iceberg dataset.

Refer to pandas.DataFrame.to_iceberg for more details.

Warning

This function is experimental in Pandas and may change in future releases.

Note

This function assumes that the Iceberg namespace is already created in the catalog.

Parameters

table_identifier: str: Table identifier to write

catalog_name: str, optional: Name of the catalog to use. If not provided, the default catalog will be used. See PyIceberg's documentation for more details.

catalog_properties: dict[str, Any], optional: Properties for the catalog connection.

location: str, optional: Location of the table (if supported by the catalog). If this is passed a path and catalog_name and catalog_properties are None, it will use a filesystem catalog with the provided location. If the location is an S3 Tables ARN it will use the S3TablesCatalog.

append: bool: Append or overwrite if the table exists

partition_spec: PartitionSpec, optional: PyIceberg partition spec for the table (only used if creating a new table). See PyIceberg's documentation for more details.

sort_order: SortOrder, optional: PyIceberg sort order for the table (only used if creating a new table). See PyIceberg's documentation for more details.

properties: dict[str, Any], optional: Properties to add to the new table.

snapshot_properties: dict[str, Any], optional: Properties to add to the new table snapshot.

Example

Simple write of a table on the filesystem without a catalog:

import bodo.pandas as bd
from pyiceberg.transforms import IdentityTransform
from pyiceberg.partitioning import PartitionField, PartitionSpec
from pyiceberg.table.sorting import SortField, SortOrder

bdf = bd.DataFrame(
        {
            "one": [-1.0, 1.3, 2.5, 3.0, 4.0, 6.0, 10.0],
            "two": ["foo", "bar", "baz", "foo", "bar", "baz", "foo"],
            "three": [True, False, True, True, True, False, False],
            "four": [-1.0, 5.1, 2.5, 3.0, 4.0, 6.0, 11.0],
            "five": ["foo", "bar", "baz", None, "bar", "baz", "foo"],
        }
    )

part_spec = PartitionSpec(PartitionField(2, 1001, IdentityTransform(), "id_part"))
sort_order = SortOrder(SortField(source_id=4, transform=IdentityTransform()))
bdf.to_iceberg("test_table", location="./iceberg_warehouse", partition_spec=part_spec, sort_order=sort_order)

out_df = bd.read_iceberg("test_table", location="./iceberg_warehouse")
# Only reads Parquet files of partition "foo" from storage
print(out_df[out_df["two"] == "foo"])

Output:

    one  two  three  four  five
0  -1.0  foo   True  -1.0   foo
1   3.0  foo   True   3.0  <NA>
2  10.0  foo  False  11.0   foo

Write a DataFrame to an Iceberg table in S3 Tables using the location parameter:

df.to_iceberg(
    table_identifier="my_table",
    location="arn:aws:s3tables:<region>:<account_number>:my-bucket/my-table"
)