AI¶
Extensions to the DataFrame library for AI training and inference.
bodo.ai.tokenize¶
Refer to bodo.pandas.BodoSeries.ai.tokenize
.
bodo.ai.embed¶
Refer to bodo.pandas.BodoSeries.ai.embed
.
bodo.ai.llm_generate¶
Refer to bodo.pandas.BodoSeries.ai.llm_generate
.
bodo.ai.prepare_model¶
def prepare_model(
model,
parallel_strategy: Literal["ddp", "fsdp"] | None = "ddp",
parallel_strategy_kwargs: dict[str, Any] | None = None,
) -> torch.nn.Module | None:
Prepares a PyTorch model for distributed training using the specified parallel strategy. Should only be used within train_loop_per_worker function passed to bodo.ai.torch_train
. This function also transfers the model to the appropriate device, if accelerators are available. If there are less accelealators than workers some workers will return None which indicates they should not perform training.
Parameters
-
model: torch.nn.Module: The PyTorch model to prepare for distributed training.
-
parallel_strategy: Literal["ddp", "fsdp"] | None: The parallel strategy to use. Supported strategies are: - "ddp": Distributed Data Parallel - "fsdp": Fully Sharded Data Parallel If None, no parallel strategy is applied and the model is returned as-is.
-
parallel_strategy_kwargs: dict[str, Any] | None: Additional keyword arguments to pass to the parallel strategy constructor (e.g.,
torch.nn.parallel.DistributedDataParallel
ortorch.distributed.fsdp.FullyShardedDataParallel
).
bodo.ai.prepare_dataset¶
def prepare_dataset(
data: DataFrame | Series,
batch_size: int,
shuffle: bool = True,
dataset_func: Callable | None = None,
collate_fn: Callable = None,
pin_memory: bool = False,
seed: int = 0,
) -> torch.utils.data.DataLoader:
Prepares a dataset for distributed training by converting a DataFrame or Series into a DataLoader. This function partitions the data among workers, applies optional shuffling, and creates batches for training.
Parameters
-
data: DataFrame | Series: The DataFrame or Series to be converted into a dataset.
-
batch_size: int: The size of each batch to be created.
-
shuffle: bool: Whether to shuffle the data before creating batches. Default is True.
-
dataset_func: Callable | None: An optional function to customize the dataset creation process. If provided, this function should accept a DataFrame or Series and return a PyTorch Dataset. If None, a default dataset creation process is used that converts each row into a tensor.
-
collate_fn: Callable: An optional function to customize how batches are collated. Default is None, which uses the default collation behavior of PyTorch DataLoader.
-
pin_memory: bool: Whether to pin memory for faster data transfer to GPU. Default is False.
-
seed: int: A seed value for shuffling the data to ensure reproducibility. Default is 0.
bodo.ai.torch_train¶
def torch_train(
train_loop_per_worker: Callable[
[], None] |
Callable[[dict], None],
*args,
**kwargs
) -> None:
Trains a PyTorch model in a distributed manner across multiple workers using the provided training loop function. This function initializes the distributed environment, partitions the dataset, and executes the training loop on each worker.
Parameters
-
train_loop_per_worker: Callable[[], None] | Callable[[dict], None]: A user-defined function that contains the training logic to be executed on each worker. This function can optionally accept a dictionary of configuration parameters that will be passed to it.
-
*args: *: Positional arguments to be passed to the
train_loop_per_worker
function. -
**kwargs: *: Keyword arguments to be passed to the
train_loop_per_worker
function.
Example:
The following example demonstrates how to use bodo.ai.torch_train
to train a simple neural network on a dataset. The training loop is able to handle both CPU and GPU training based on the available hardware. If you know which you will be training on you can simplify the code by removing the irrelevant code.
import bodo.pandas as pd
import bodo.ai.train
import tempfile
df = bd.DataFrame(
{
"feature1": pd.array([1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0], dtype="float32"),
"feature2": pd.array([2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0], dtype="float32"),
"label": pd.array([3.0, 5.0, 7.0, 9.0, 11.0, 13.0, 15.0], dtype="float32"),
}
)
def train_loop(data, config):
import torch
import torch.distributed.checkpoint
import torch.nn as nn
# Simple linear regression model
class SimpleModel(nn.Module):
def __init__(self):
super().__init__()
self.linear1 = nn.Linear(2, 32)
self.relu = nn.ReLU()
self.linear2 = nn.Linear(32, 1)
def forward(self, x):
return self.linear2(self.relu(self.linear1(x)))
model = SimpleModel()
model = bodo.ai.train.prepare_model(model, parallel_strategy="ddp")
dataloader = bodo.ai.train.prepare_dataset(
data, batch_size=config.get("batch_size", 2)
)
if model is None:
return
# train on data
criterion = nn.MSELoss()
optimizer = torch.optim.SGD(model.parameters(), lr=0.01)
device = next(model.parameters()).device
for epoch in range(config.get("epochs", 5)):
for batch in dataloader:
batch = batch.to(device)
inputs = batch[:, :2]
labels = batch[:, 2].unsqueeze(1)
optimizer.zero_grad()
outputs = model(inputs)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
# Create checkpoint.
base_model = (
model.module
if isinstance(model, torch.nn.parallel.DistributedDataParallel)
else model
)
torch.distributed.checkpoint.save(
{"model_state_dict": base_model.state_dict()},
checkpoint_id=config["checkpoint_dir"],
)
print(f"Epoch {epoch}, Loss: {loss.item()}")
bodo.ai.train.torch_train(
train_loop,
df,
{"batch_size": 2, "checkpoint_dir": tempfile.mkdtemp("checkpoint_dir")},
)