DataFrame API¶
BodoDataFrame.apply¶
BodoDataFrame.apply(
func,
axis=0,
raw=False,
result_type=None,
args=(),
by_row="compat",
engine="python",
engine_kwargs=None,
**kwargs,
) -> BodoSeries
Apply a function along an axis of the BodoDataFrame.
Currently only supports applying a function that returns a scalar value for each row (i.e. axis=1
).
All other uses will fall back to Pandas.
See pandas.DataFrame.apply
for more details.
Note
Calling BodoDataFrame.apply
will immediately execute a plan to generate a small sample of the BodoDataFrame
and then call pandas.DataFrame.apply
on the sample to infer output types
before proceeding with lazy evaluation.
Parameters
-
func : function: Function to apply to each row.
-
axis : {0 or 1}, default 0: The axis to apply the function over.
axis=0
will fall back topandas.DataFrame.apply
. -
args : tuple: Additional positional arguments to pass to func.
-
**kwargs: Additional keyword arguments to pass as keyword arguments to func.
-
All other parameters will trigger a fallback to
pandas.DataFrame.apply
if a non-default value is provided. Returns
-
BodoSeries: The result of applying func to each row in the BodoDataFrame.
Example
import bodo.pandas as bd
bdf = bd.DataFrame(
{
"a": bd.array([1, 2, 3] * 4, "Int64"),
"b": bd.array([4, 5, 6] * 4, "Int64"),
"c": ["a", "b", "c"] * 4,
},
)
out_bodo = bdf.apply(lambda x: x["a"] + 1, axis=1)
print(type(out_bodo))
print(out_bodo)
Output:
<class 'bodo.pandas.series.BodoSeries'>
0 2
1 3
2 4
3 2
4 3
5 4
6 2
7 3
8 4
9 2
10 3
11 4
dtype: int64[pyarrow]
BodoDataFrame.groupby¶
BodoDataFrame.groupby(
by=None,
axis=lib.no_default,
level=None,
as_index=True,
sort=False,
group_keys=True,
observed=lib.no_default,
dropna=True
) -> DataFrameGroupBy
Creates a DataFrameGroupBy object representing the data in the input DataFrame grouped by a column or list of columns. The object can then be used to apply functions over groups.
Parameters
-
by : str | List[str]: The column or list of columns to use when creating groups.
-
as_index : bool, default True: Whether the grouped labels will appears as an index in the final output. If as_index is False, then the grouped labels will appear as regular columns.
-
dropna: bool, default True If True, rows where the group label contains a missing value will be dropped from the final output.
-
All other parameters will trigger a fallback to
pandas.DataFrame.groupby
if a non-default value is provided. Returns
-
DataFrameGroupBy
Examples
import bodo.pandas as bd
bdf1 = bd.DataFrame({
"A": ["foo", "foo", "bar", "bar"],
"B": [1, 1, 1, None],
"C": [1, 2, 3, 4]
})
bdf2 = bdf1.groupby(["A", "B"]).sum()
print(bdf2)
Output:
BodoDataFrame.head¶
Returns the first n rows of the BodoDataFrame.
Parameters
-
n : int, default 5: Number of rows to select.
Returns
-
BodoDataFrame
Example
import bodo.pandas as bd
original_df = bd.DataFrame(
{"foo": range(15), "bar": range(15, 30)}
)
original_df.to_parquet("example.pq")
restored_df = bd.read_parquet("example.pq")
restored_df_head = restored_df.head(2)
print(type(restored_df_head))
print(restored_df_head)
Output:
BodoDataFrame.map_partitions¶
Apply a function to groups of rows in a DataFrame and return a DataFrame or Series of the same size.
If the input DataFrame is lazy (i.e. its plan has not been evaluated yet) and func returns a Series, then the output will be lazy as well. When the lazy output is evaluated, func will take batches of rows from the input DataFrame. In the cases where func returns a DataFrame or the input DataFrame is not lazy, each worker will call func on their entire local chunk of the input DataFrame.
Parameters
-
func : Callable: A function that takes in a DataFrame and returns a DataFrame or Series (with the same number of rows). Currently, functions that return a DataFrame will trigger execution even if the input DataFrame has a lazy plan.
-
*args: Additional positional arguments to pass to func.
-
**kwargs: Additional keyword arguments to pass as keyword arguments to func.
Returns
-
BodoSeries or BodoDataFrame: The result of applying func to the BodoDataFrame.
Example
import bodo.pandas as bd
bdf = bd.DataFrame(
{"foo": range(15), "bar": range(15, 30)}
)
bdf_mapped = bdf.map_partitions(lambda df_: df_.foo + df_.bar)
print(bdf_mapped)
Output:
0 15
1 17
2 19
3 21
4 23
5 25
6 27
7 29
8 31
9 33
10 35
11 37
12 39
13 41
14 43
dtype: int64[pyarrow]
Setting DataFrame Columns¶
Bodo DataFrames support setting columns lazily when the value is a Series created from the same DataFrame or a constant value. Other cases will fallback to Pandas.
Examples
import bodo.pandas as bd
bdf = bd.DataFrame(
{
"A": bd.array([1, 2, 3, 7] * 3, "Int64"),
"B": ["A1", "B1 ", "C1", "Abc"] * 3,
"C": bd.array([4, 5, 6, -1] * 3, "Int64"),
}
)
bdf["D"] = bdf["B"].str.lower()
print(type(bdf))
print(bdf.D)
Output:
<class 'bodo.pandas.frame.BodoDataFrame'>
0 a1
1 b1
2 c1
3 abc
4 a1
5 b1
6 c1
7 abc
8 a1
9 b1
10 c1
11 abc
Name: D, dtype: string
import bodo.pandas as bd
bdf = bd.DataFrame(
{
"A": bd.array([1, 2, 3, 7] * 3, "Int64"),
"B": ["A1", "B1 ", "C1", "Abc"] * 3,
"C": bd.array([4, 5, 6, -1] * 3, "Int64"),
}
)
bdf["D"] = 11
print(type(bdf))
print(bdf.D)
Output:
<class 'bodo.pandas.frame.BodoDataFrame'>
0 11
1 11
2 11
3 11
4 11
5 11
6 11
7 11
8 11
9 11
10 11
11 11
Name: D, dtype: int64[pyarrow]
BodoDataFrame.sort_values¶
BodoDataFrame.sort_values(by, *, axis=0, ascending=True, inplace=False, kind="quicksort", na_position="last", ignore_index=False, key=None)
Parameters
-
by: str or list of str: Name or list of column names to sort by.
-
ascending : bool or list of bool, default True: Sort ascending vs. descending. Specify list for multiple sort orders. If this is a list of bools, must match the length of the by.
-
na_position: str {'first', 'last'} or list of str, default 'last': Puts NaNs at the beginning if first; last puts NaNs at the end. Specify list for multiple NaN orders by key. If this is a list of strings, must match the length of the by.
-
All other parameters will trigger a fallback to
pandas.DataFrame.sort_values
if a non-default value is provided. Returns
-
BodoDataFrame
Example
import bodo.pandas as bd
bdf = bd.DataFrame(
{
"A": bd.array([1, 2, 3, 7] * 3, "Int64"),
"B": ["A1", "B1", "C1", "Abc"] * 3,
"C": bd.array([6, 5, 4] * 4, "Int64"),
}
)
bdf_sorted = bdf.sort_values(by=["A", "C"], ascending=[False, True])
print(bdf_sorted)
Output:
A B C
0 7 Abc 4
1 7 Abc 5
2 7 Abc 6
3 3 C1 4
4 3 C1 5
5 3 C1 6
6 2 B1 4
7 2 B1 5
8 2 B1 6
9 1 A1 4
10 1 A1 5
11 1 A1 6
BodoDataFrame.to_parquet¶
BodoDataFrame.to_parquet(path=None, engine="auto", compression="snappy", index=None, partition_cols=None, storage_options=None, row_group_size=-1, **kwargs)
Parameters
-
path: str: Output path to write. It can be a local path (e.g.
output.parquet
), AWS S3 (s3://...
), Azure ALDS (abfs://...
,abfss://...
), or GCP GCS (gcs://...
,gs://
). -
compression : str, default 'snappy': File compression to use. Can be None, 'snappy', 'gzip', or 'brotli'.
-
row_group_size : int: Row group size in output Parquet files. -1 allows the backend to choose.
-
All other parameters will trigger a fallback to
pandas.DataFrame.to_parquet
.
Example
import bodo.pandas as bd
bdf = bd.DataFrame(
{
"A": bd.array([1, 2, 3, 7] * 3, "Int64"),
"B": ["A1", "B1", "C1", "Abc"] * 3,
"C": bd.array([6, 5, 4] * 4, "Int64"),
}
)
bdf.to_parquet("output.parquet")
print(bd.read_parquet("output.parquet"))
Output:
A B C
0 1 A1 6
1 2 B1 5
2 3 C1 4
3 7 Abc 6
4 1 A1 5
5 2 B1 4
6 3 C1 6
7 7 Abc 5
8 1 A1 4
9 2 B1 6
10 3 C1 5
11 7 Abc 4
BodoDataFrame.to_iceberg¶
BodoDataFrame.to_iceberg(
table_identifier,
catalog_name=None,
*,
catalog_properties=None,
location=None,
append=False,
partition_spec=None,
sort_order=None,
properties=None,
snapshot_properties=None
)
Refer to pandas.DataFrame.to_iceberg
for more details.
Warning
This function is experimental in Pandas and may change in future releases.
Note
This function assumes that the Iceberg namespace is already created in the catalog.
Parameters
-
table_identifier: str: Table identifier to write
-
catalog_name: str, optional: Name of the catalog to use. If not provided, the default catalog will be used. See PyIceberg's documentation for more details.
-
catalog_properties: dict[str, Any], optional: Properties for the catalog connection.
-
location: str, optional: Location of the table (if supported by the catalog). If this is passed a path and catalog_name and catalog_properties are None, it will use a filesystem catalog with the provided location. If the location is an S3 Tables ARN it will use the S3TablesCatalog.
-
append: bool: Append or overwrite if the table exists
-
partition_spec: PartitionSpec, optional: PyIceberg partition spec for the table (only used if creating a new table). See PyIceberg's documentation for more details.
-
sort_order: SortOrder, optional: PyIceberg sort order for the table (only used if creating a new table). See PyIceberg's documentation for more details.
-
properties: dict[str, Any], optional: Properties to add to the new table.
-
snapshot_properties: dict[str, Any], optional: Properties to add to the new table snapshot.
Example
Simple write of a table on the filesystem without a catalog:
import bodo.pandas as bd
from pyiceberg.transforms import IdentityTransform
from pyiceberg.partitioning import PartitionField, PartitionSpec
from pyiceberg.table.sorting import SortField, SortOrder
bdf = bd.DataFrame(
{
"one": [-1.0, 1.3, 2.5, 3.0, 4.0, 6.0, 10.0],
"two": ["foo", "bar", "baz", "foo", "bar", "baz", "foo"],
"three": [True, False, True, True, True, False, False],
"four": [-1.0, 5.1, 2.5, 3.0, 4.0, 6.0, 11.0],
"five": ["foo", "bar", "baz", None, "bar", "baz", "foo"],
}
)
part_spec = PartitionSpec(PartitionField(2, 1001, IdentityTransform(), "id_part"))
sort_order = SortOrder(SortField(source_id=4, transform=IdentityTransform()))
bdf.to_iceberg("test_table", location="./iceberg_warehouse", partition_spec=part_spec, sort_order=sort_order)
out_df = bd.read_iceberg("test_table", location="./iceberg_warehouse")
# Only reads Parquet files of partition "foo" from storage
print(out_df[out_df["two"] == "foo"])
Output:
Write a DataFrame to an Iceberg table in S3 Tables using the location parameter:
df.to_iceberg(
table_identifier="my_table",
location="arn:aws:s3tables:<region>:<account_number>:my-bucket/my-table"
)