Skip to content

Scalable Data I/O

Efficient parallel data processing requires data I/O to be parallelized effectively as well. Bodo provides parallel file I/O for many different formats such as Parquet, CSV, JSON, Numpy binaries, HDF5 and SQL databases. This diagram demonstrates how chunks of data are partitioned among parallel execution engines by Bodo.


Bodo reads dataset chunks in parallel

Bodo automatically parallelizes I/O for any number of cores and cluster size without any additional API layers.

I/O Workflow

Make sure I/O calls for large data are inside JIT functions to allow Bodo to parallelize I/O and divide the data across cores automatically (see below for supported formats and APIs).

Warning

Performing I/O in regular Python (outside JIT functions) replicates data on all Python processes, which can result in out-of-memory errors if the data is large. For example, a 1 GB dataframe replicated on 1000 cores consumes 1 TB of memory.

Bodo looks at the schema of the input dataset during compilation time to infer the datatype of the resulting dataframe. This requires the dataset path to be available to the compiler. The path should be either a constant string value, an argument to the JIT function, or a simple combination of the two. For example, the following code passes the dataset path as an argument, allowing Bodo to infer the data type of df:

import os
import pandas as pd
import bodo

data_path = os.environ["JOB_DATA_PATH"]

@bodo.jit
def f(path):
    df = pd.read_parquet(path)
    print(df.A.sum())

f(data_path)

Concatenating arguments and constant values also works:

import os
import pandas as pd
import bodo

data_root = os.environ["JOB_DATA_ROOT"]

@bodo.jit
def f(root):
    df = pd.read_parquet(root + "/table1.pq")
    print(df.A.sum())

f(data_root)

In the rare case that the path should be a dynamic value inside JIT functions, the data types have to be specified manually (see Specifying I/O Data Types Manually). This is error-prone and should be avoided as much as possible.

Supported Data Formats

Currently, Bodo supports I/O for Parquet, CSV, SQL, JSON, HDF5, and Numpy binaries formats. It can read these formats from multiple filesystems, including S3, HDFS and Azure Data Lake (ADLS) (see File Systems below for more information). Many databases and data warehouses such as Snowflake are supported as well.

Also see supported pandas APIs for supported arguments of I/O functions.

Parquet

Parquet is a commonly used file format in analytics due to its efficient columnar storage. Bodo supports the standard pandas API for reading Parquet: pd.read_parquet(path), where path can be a parquet file, a directory with multiple parquet files (all are part of the same dataframe), a glob pattern, list of files or list of glob patterns:

import pandas as pd
import bodo

@bodo.jit
def write_pq(df):
    df.to_parquet("s3://bucket-name/example.pq")

@bodo.jit
def read_pq():
    df = pd.read_parquet("s3://bucket-name/example.pq")
    return df

Note

Bodo reads datasets in parallel using multiple cores while ensuring that the number of rows read on all cores is roughly equal. The size and number of row groups can affect parallel read performance significantly. Currently, reading any number of rows in Bodo requires reading at least one row-group. To read even a single row from a parquet dataset, the entire row-group containing that row (and its corresponding metadata) needs to be read first, and then the required row is extracted from it. Therefore, for best parallel read performance, there must be sufficient row-groups to minimize the number of instances where multiple cores need to read from the same row group. This means there must be at least as many row groups as the number of cores, but ideally a lot more. At the same time, the size of the row-groups should not be too small since this can lead to overheads. For more details about parquet file format, refer to the format specification.

to_parquet(name) with distributed data writes to a folder called name. Each process writes one file into the folder, but if the data is not distributed, to_parquet(name) writes to a single file called name:

df = pd.DataFrame({"A": range(10)})

@bodo.jit
def example1_pq(df):
    df.to_parquet("example1.pq")

@bodo.jit(distributed={"df"})
def example2_pq(df):
    df.to_parquet("example2.pq")

if bodo.get_rank() == 0:
    example1_pq(df)
example2_pq(df)

Run the code above with 4 processors:

mpiexec -n 4 python example_pq.py

example1_pq(df) writes 1 single file, and example2_pq(df) writes a folder containing 4 parquet files:

.
├── example1.pq
├── example2.pq
│   ├── part-00.parquet
│   ├── part-01.parquet
│   ├── part-02.parquet
│   └── part-03.parquet

See read_parquet(), to_parquet() for supported arguments.

Filter Pushdown and Column Pruning

Filter Pushdown and Column Pruning

Bodo can detect filters used by the code and optimize the read_parquet call by pushing the filters down to the storage layer, so that only the rows required by the program are read. In addition, Bodo only reads the columns that are used in the program, and prunes the unused columns. These optimizations can significantly speed up I/O in many cases and can substantially reduce the program's memory footprint.

For example, suppose we have a large dataset with many columns that spans many years, and we only need to read revenue data for a particular year:

@bodo.jit
def query():
    df = pd.read_parquet("s3://my-bucket/data.pq")
    df = df[df["year"] == 2021]
    return df.groupby("customer_key")["revenue"].max()

When compiling the above, Bodo detects the df[df["year"] == 2021] filter and optimizes the read_parquet call so that it only reads data for year 2021 from S3. This requires the dataframe filtering operation to be in the same JIT function as read_parquet, and the dataframe variable shouldn't be used before filtering. Bodo also makes sure only customer_key and revenue columns are read since other columns are not used in the programs.

In general, if the dataset is hive-partitioned and partition columns appear in filter expressions, only the files that contain relevant data are read, and the rest are discarded based on their path. For example, if year is a partition column above and we have a dataset:

.
└── data.pq/
    │   ...
    ├───year=2020/
    │   ├── part-00.parquet
    │   └── part-01.parquet
    └───year=2021/
        ├── part-02.parquet
        └── part-03.parquet

Bodo will only read the files in the year=2021 directory.

For non-partition columns, Bodo may discard files entirely just by looking at their parquet metadata (depending on the filters and statistics contained in the metadata) or filter the rows during read.

Note

Filter pushdown is often a very important optimization and critical for having manageable memory footprint in big data workloads. Make sure filtering happens in the same JIT function right after dataset read (or JIT functions for I/O are inlined, see inlining).

Exploring Large Data Without Full Read

Exploring Large Data Without Full Read

Exploring large datasets often requires seeing its shape and a sample of the data. Bodo is able to provide this information quickly without loading the full Parquet dataset, which means there is no need for a large cluster with a lot of memory. For example:

@bodo.jit
def head_only_read():
    df = pd.read_parquet("s3://my-bucket/example.pq")
    print(df.shape)
    print(df.head())

In this example, Bodo provides the shape information for the full dataset in df.shape, but only loads the first few rows that are necessary for df.head().

CSV

CSV is a common text format for data exchange. Bodo supports most of the standard pandas API to read CSV files:

import pandas as pd
import bodo

@bodo.jit
def write_csv(df):
    df.to_csv("s3://my-bucket/example.csv")

@bodo.jit
def read_csv():
    df = pd.read_csv("s3://my-bucket/example.csv")
    return df

Unlike read_csv in regular pandas, Bodo can read a directory that contains multiple partitioned CSV files as well. All files in the folder must have the same number and datatype of columns. They can have different number of rows.

Usage:

@bodo.jit
def read_csv_folder():
    df = pd.read_csv("s3://my-bucket/path/to/folder/foldername")
    doSomething(df)

Use sep="n" to read text files line by line into a single-column dataframe (without creating separate columns, useful when text data is unstructured or there are too many columns to read efficiently):

@bodo.jit
def read_test():
    df = pd.read_csv("example.csv", sep="n", names=["value"], dtype={"value": "str"})
    return df

Note

Bodo uses nullable integer types of pandas to ensure type stability (see integer NA issue in pandas for more details). Therefore, data types must be specified explicitly for accurate performance comparisons of Bodo and pandas for read_csv.

to_csv(name) has different behaviors for different file systems:

  1. POSIX file systems: always writes to a single file, regardless of the number of processes and whether the data is distributed, but writing is still done in parallel when more than 1 processor is used:

    df = pd.DataFrame({"A": np.arange(n)})
    
    @bodo.jit
    def example1_csv(df):
        df.to_csv("example1.csv")
    
    @bodo.jit(distributed={"df"})
    def example2_csv(df):
        df.to_csv("example2.csv")
    
    if bodo.get_rank() == 0:
        example1_csv(df)
    example2_csv(df)
    

    Run the code above with 4 processors:

    mpiexec -n 4 python example_csv.py
    

    each example1_csv(df) and example2_csv(df) writes to a single file:

    .
    ├── example1.csv
    ├── example2.csv
    
  2. S3 and HDFS: distributed data is written to a folder called name. Each process writes one file into the folder, but if the data is not distributed, to_csv(name) writes to a single file called name:

    df = pd.DataFrame({"A": np.arange(n)})
    
    @bodo.jit
    def example1_csv(df):
        df.to_csv("s3://bucket-name/example1.csv")
    
    @bodo.jit(distributed={"df"})
    def example2_csv(df):
        df.to_csv("s3://bucket-name/example2.csv")
    
    if bodo.get_rank() == 0:
        example1_csv(df)
    example2_csv(df)
    

    Run the code above with 4 processors:

    mpiexec -n 4 python example_csv.py
    

    example1_csv(df) writes 1 single file, and example2_csv(df) writes a folder containing 4 csv files:

    .
    ├── example1.csv
    ├── example2.csv
    │   ├── part-00.csv
    │   ├── part-01.csv
    │   ├── part-02.csv
    │   └── part-03.csv
    

See read_csv(), to_csv() for supported arguments.

JSON

For JSON, the syntax is also the same as pandas.

Usage:

@bodo.jit
def example_write_json(df, fname):
    df.to_json(fname)

@bodo.jit
def example_read_json_lines_format():
    df = pd.read_json("example.json", orient = "records", lines = True)

@bodo.jit
def example_read_json_multi_lines():
    df = pd.read_json("example_file.json", orient = "records", lines = False,
        dtype={"A": float, "B": "bool", "C": int})

Note

  • The dtype argument is required when reading a regular multi-line JSON file.
  • Bodo cannot read a directory containing multiple multi-line JSON files
  • Bodo's default values for orient and lines are records and False respectively.

to_json(name) has different behaviors for different file systems:

  1. POSIX file systems: to_json(name) behavior depends on orient and lines arguments.

    1. DataFrame.to_json(name, orient="records", lines=True) (i.e. writing JSON Lines text file format) always writes to a single file, regardless of the number of processes and whether the data is distributed, but writing is still done in parallel when more than 1 processor is used:

      df = pd.DataFrame({"A": np.arange(n)})
      
      @bodo.jit
      def example1_json(df):
          df.to_json("example1.json", orient="records", lines=True)
      
      @bodo.jit(distributed={"df"})
      def example2_json(df):
          df.to_json("example2.json", orient="records", lines=True)
      
      if bodo.get_rank() == 0:
          example1_json(df)
      example2_jsons(df)
      

      Run the code above with 4 processors:

      mpiexec -n 4 python example_json.py
      

      each example1_json(df) and example2_json(df) writes to a single file:

      .
      ├── example1.json
      ├── example2.json
      
    2. All other combinations of values for orient and lines have the same behavior as S3 and HDFS explained below.

  2. S3 and HDFS : distributed data is written to a folder called name. Each process writes one file into the folder, but if the data is not distributed, to_json(name) writes to a file called name:

    df = pd.DataFrame({"A": np.arange(n)})
    
    @bodo.jit
    def example1_json(df):
        df.to_json("s3://bucket-name/example1.json")
    
    @bodo.jit(distributed={"df"})
    def example2_json(df):
        df.to_json("s3://bucket-name/example2.json")
    
    if bodo.get_rank() == 0:
        example1_json(df)
    example2_json(df)
    

    Run the code above with 4 processors:

    mpiexec -n 4 python example_json.py
    

    example1_json(df) writes 1 single file, and example2_json(df) writes a folder containing 4 json files:

    .
    ├── example1.json
    ├── example2.json
    │   ├── part-00.json
    │   ├── part-01.json
    │   ├── part-02.json
    │   └── part-03.json
    

    See read_json()][pandas-f-in], [to_json()` for supported arguments.

SQL

See Databases for the list of supported Relational Database Management Systems (RDBMS) with Bodo.

For SQL, the syntax is also the same as pandas. For reading:

@bodo.jit
def example_read_sql():
    df = pd.read_sql("select * from employees", "mysql+pymysql://<username>:<password>@<host>/<db_name>")

See read_sql() for supported arguments.

For writing:

@bodo.jit
def example_write_sql(df):
    df.to_sql("table_name", "mysql+pymysql://<username>:<password>@<host>/<db_name>")

See to_sql() for supported arguments.

Note

sqlalchemy must be installed in order to use pandas.read_sql.

Filter Pushdown and Column Pruning

Filter Pushdown and Column Pruning

Similar to Parquet read, Bodo JIT compiler is able to push down filters to the data source and prune unused columns automatically. For example, this program reads data from a very large Snowflake table, but only needs limited rows and columns:

@bodo.jit
def filter_ex(conn, int_val):
    df = pd.read_sql("SELECT * FROM LINEITEM", conn)
    df = df[(df["l_orderkey"] > 10) & (int_val >= df["l_linenumber"])]
    result = df["l_suppkey"]
    print(result)

filter_ex(conn, 2)

Bodo optimizes the query passed to read_sql to push filters down and prune unused columns. In this case, Bodo will replace SELECT * FROM LINEITEM with the optimized version automatically:

SELECT "L_SUPPKEY" FROM (SELECT * FROM LINEITEM) as TEMP
WHERE  ( ( l_orderkey > 10 ) AND ( l_linenumber <= 2 ) )

Delta Lake

Reading parquet files from Delta Lake is supported locally, from S3, and from Azure ADLS.

  • The Delta Lake binding python packaged needs to be installed using pip:pip install deltalake.
  • For S3, the AWS_DEFAULT_REGION environment variable should be set to the region of the bucket hosting the Delta Lake table.
  • For ADLS, the AZURE_STORAGE_ACCOUNT and AZURE_STORAGE_KEY environment variables need to be set.

Example code for reading:

@bodo.jit
def example_read_deltalake():
    df = pd.read_parquet("path/to/deltalake")

Note

Writing is currently not supported.

Iceberg

Bodo's support for Iceberg Tables is under active development and currently only supports basic read and write functionality.

Bodo supports reading Iceberg tables stored in a directory on HDFS, either locally or from S3, through Pandas' read_sql_table API.

  • Bodo's Iceberg Connector python package needs to be installed using conda: conda install bodo-iceberg-connector -c bodo.ai -c conda-forge.
  • For tables on S3, the credentials should be set either using environment variables, or AWS configuration in ~/.aws or using an instance profile on the EC2 instance.

Iceberg connection strings vary by catalog, but in general are of the form iceberg<+conn>://<path><?params> where - <conn>://<path> is the location of the catalog or Iceberg warehouse - params is a list of properties to pass to the catalog. Each parameter must be of the form <key>=<value> and separated with &, similar to HTTP URLs.

The following parameters are officially supported: - type: Type of catalog. The supported values are listed below. When the connection string is ambiguous, this parameter is used to determine the type of catalog implementation. - warehouse: Location of the warehouse. Required when creating a new table using Glue, Nessie, or Hive catalog.

The following catalogs are supported:

  • Hadoop Catalog on Local Filesystem:

    • Used when type=hadoop is specified or when <conn> is file or empty
    • <path> is the absolute path to the warehouse (directory containing the database schema)
    • Parameter warehouse will be ignored if specified
    • E.g. iceberg://<ABSOLUTE PATH TO ICEBERG WAREHOUSE> or iceberg+file://<ABSOLUTE PATH TO ICEBERG WAREHOUSE>
  • Hadoop Catalog on S3

    • Used when type=hadoop-s3 is specified or when <conn> is s3
    • <conn>://<path> is the S3 path to the warehouse (directory or bucket containing the database schema)
    • Parameter warehouse will be ignored if specified
    • E.g. iceberg+s3://<S3 PATH TO ICEBERG WAREHOUSE>
  • Dremio Arctic or Nessie Catalog

    • Must specify type=nessie as a parameter to use this warehouse.
    • <conn>://<path> is the URL to the Nessie catalog, which can be found on Dremio's dashboard.
      • It will look like the following: https://nessie.dremio.cloud/v1/projects/<PROJECT ID>
      • <PROJECT ID> is the Nessie project UUID
    • The following parameters are required:
      • authentication.type=BEARER
      • authentication.token=<AUTH TOKEN> where <AUTH TOKEN> is your personal Dremio authentication token and can be found on the dashboard
    • Parameter warehouse is required to create a table
    • E.g. iceberg+https://nessie.dremio.cloud/v1/projects/<PROJECT ID>?type=nessie&authentication.type=BEARER&authentication.token=<AUTH TOKEN>
  • AWS Glue Catalog

    • Connection string must be of the form iceberg+glue?<params>
    • Parameter type will be ignored if specified
    • Parameter warehouse is required to create a table
    • E.g. iceberg+glue or iceberg+glue?warehouse=s3://<ICEBERG-BUCKET>
  • Hive / Thrift Catalog

    • Used when type=hive is specified or when <conn> is thrift
    • <conn>://<path> is the URL to the Thrift catalog, i.e. thrift://localhost:9083
    • Parameter warehouse is required to create the table
    • E.g. iceberg+thrift://<THRIFT URL>

Example code for reading:

@bodo.jit
def example_read_iceberg():
    df = pd.read_sql_table(
            table_name="<NAME OF ICEBERG TABLE", 
            con="<SEE PREVIOUS SECTION ON HOW TO FORMAT THIS FOR DIFFERENT CATALOGS>",
            schema="<NAME OF ICEBERG DATABASE SCHEMA>"
         )

Note

  • schema argument is required for reading Iceberg tables.

  • The Iceberg table to read should be located at <warehouse-location>/<schema>/<table_name>, where schema and table_name are the arguments to pd.read_sql_table, and warehouse-location is inferred from the connection string based on the description provided above.

Warning

Bodo has basic support for writing Iceberg tables from Pandas Dataframes using the to_sql API, including support for appending to tables with an existing partition spec and/or sort order.

Example code for writing:

@bodo.jit(distributed=["df"])
def write_iceberg_table(df: pandas.DataFrame):
    df.to_sql(
        name="<NAME OF ICEBERG TABLE",
        con="<SEE PREVIOUS SECTION ON HOW TO FORMAT THIS FOR DIFFERENT CATALOGS>",
        schema="<NAME OF ICEBERG DATABASE SCHEMA>",
        if_exists="replace"
    )

Note

  • schema argument is required for reading Iceberg tables.
  • Writing Pandas Dataframe index to an Iceberg table is not supported. If index and index_label are provided, they will be ignored.
  • chunksize, dtype and method arguments are not supported and will be ignored if provided.
  • While Bodo can append to tables with an existing partition spec and/or sort order, it does not support creating new tables with a Partition Spec or Sort Order.

Numpy binaries

Numpy's fromfile and tofile are supported as below:

@bodo.jit
def example_np_io():
    A = np.fromfile("myfile.dat", np.float64)
    ...
    A.tofile("newfile.dat")

Bodo has the same behavior as Numpy for numpy.ndarray.tofile(), where we always write to a single file. However, writing distributed data to POSIX is done in parallel, but writing to S3 & HDFS is done sequentially (due to file system limitations).

HDF5

HDF5 is a common format in scientific computing, especially for multi-dimensional numerical data. HDF5 can be very efficient at scale, since it has native parallel I/O support. For HDF5, the syntax is the same as the h5py package. For example:

@bodo.jit
def example_h5():
    f = h5py.File("data.hdf5", "r")
    X = f["points"][:]
    Y = f["responses"][:]

File Systems

Amazon S3

Reading and writing CSV, Parquet, JSON, and Numpy binary files from and to Amazon S3 is supported.

The fsspec package must be available, and the file path should start with s3://:

@bodo.jit
def example_s3_parquet():
    df = pd.read_parquet("s3://bucket-name/file_name.parquet")

These environment variables are used for File I/O with S3 credentials:

  • AWS_ACCESS_KEY_ID
  • AWS_SECRET_ACCESS_KEY
  • AWS_DEFAULT_REGION: default as us-east-1
  • AWS_S3_ENDPOINT: specify custom host name, default as AWS endpoint(s3.amazonaws.com)

Connecting to S3 endpoints through a proxy is supported. The proxy URI can be provided by setting one of the following environment variables (listed in order of precedence):

  • http_proxy
  • https_proxy
  • HTTP_PROXY
  • HTTPS_PROXY

Bodo uses Apache Arrow internally for read and write of data on S3.

Google Cloud Storage

Reading and writing Parquet files from and to Google Cloud is supported.

The file path should start with gs:// or gcs://:

@bodo.jit
def example_gcs_parquet():
    df = pd.read_parquet("gcs://bucket-name/file_name.parquet")
These environment variables are used for File I/O with GCS credentials:

  • GOOGLE_APPLICATION_CREDENTIALS

Details for GOOGLE_APPLICATION_CREDENTIALS can be seen in the Google docs here.

Bodo uses the fsspec-based gcsfs library internally for read and write of data on GCS.

Hadoop Distributed File System (HDFS) and Azure Data Lake Storage (ADLS) Gen2

Reading and writing CSV, Parquet, JSON, and Numpy binary files from and to Hadoop Distributed File System (HDFS) is supported. Note that Azure Data Lake Storage Gen2 can be accessed through HDFS.

The openjdk version 11 package must be available, and the file path should start with hdfs:// or abfs[s]://:

@bodo.jit
def example_hdfs_parquet():
    df = pd.read_parquet("hdfs://host:port/dir/file_name.pq")

These environment variables are used for File I/O with HDFS:

  • HADOOP_HOME: the root of your installed Hadoop distribution. Often is lib/native/libhdfs.so.
  • ARROW_LIBHDFS_DIR: location of libhdfs. Often is $HADOOP_HOME/lib/native.
  • CLASSPATH: must contain the Hadoop jars. You can set these using:
    export CLASSPATH=`$HADOOP_HOME/bin/hdfs classpath --glob`
    

Bodo uses Apache Arrow internally for read and write of data on HDFS. $HADOOP_HOME/etc/hadoop/hdfs-site.xml provides default behaviors for the HDFS client used by Bodo. Inconsistent configurations (e.g. dfs.replication) could potentially cause errors in Bodo programs.

Setting up HDFS/ADLS Credentials

Setting up HDFS/ADLS Credentials

Hadoop Filesystem sources its credentials from the first available core-site.xml file on the CLASSPATH. When Hadoop is set up (including on Bodo Platform), this file is usually created at $HADOOP_HOME/etc/hadoop/core-site.xml automatically. You can edit this file and set credentials appropriately.

You can also write the core-site configuration to bodo.HDFS_CORE_SITE_LOC, which is a temporary file Bodo adds to the CLASSPATH when it is initialized:

import bodo

# Initialize the temporary directory where the core-site file
# will be written
bodo.HDFS_CORE_SITE_LOC_DIR.initialize()

# Define the core-site for your regular ADLS/HDFS read/write
# operations
CORE_SITE_SPEC = """
<configuration>
...
</configuration>
"""

# Write it to the temporary core-site file.
# Do it on one rank on every node to avoid filesystem conflicts.
if bodo.get_rank() in bodo.get_nodes_first_ranks():
    with open(bodo.HDFS_CORE_SITE_LOC, 'w') as f:
        f.write(CORE_SITE_SPEC)


@bodo.jit
def etl_job():
    df = pd.read_parquet("abfs://{CONTAINER_NAME}@{STORAGE_ACCOUNT_NAME}.dfs.core.windows.net/file.pq")
    # ..
    # .. Some ETL Processing
    # ..
    df.to_parquet("abfs://{CONTAINER_NAME}@{STORAGE_ACCOUNT_NAME}.dfs.core.windows.net/out_file.pq")
    return


etl_job()

There are several authorization methods for reading from or writing to ADLS Storage Containers, all of which require slightly different core-site configurations. Here are some examples:

  • Using an ADLS Access Key

    import bodo
    import pandas as pd
    
    # Initialize the temporary directory where the core-site file
    # will be written
    bodo.HDFS_CORE_SITE_LOC_DIR.initialize()
    
    
    # Define the core-site for your regular ADLS/HDFS read/write
    # operations
    CORE_SITE_SPEC = """<configuration>
    <property>
        <name>fs.abfs.impl</name>
        <value>org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem</value>
    </property>
    <property>
        <name>fs.azure.account.auth.type.{STORAGE_ACCOUNT_NAME}.dfs.core.windows.net</name>
        <value>SharedKey</value>
    </property>
    <property>
        <name>fs.azure.account.key.{STORAGE_ACCOUNT_NAME}.dfs.core.windows.net</name>
        <value>{ADLS_SECRET}</value>
        <description> The ADLS storage account access key itself.</description>
    </property>
    </configuration>
    """
    
    # Write it to the temporary core-site file.
    # Do it on one rank on every node to avoid filesystem conflicts.
    if bodo.get_rank() in bodo.get_nodes_first_ranks():
        with open(bodo.HDFS_CORE_SITE_LOC, 'w') as f:
            f.write(CORE_SITE_SPEC)
    
    # Run your ETL job
    @bodo.jit
    def etl_job():
        df = pd.read_parquet("abfs://{CONTAINER_NAME}@{STORAGE_ACCOUNT_NAME}.dfs.core.windows.net/file.pq")
        # ..
        # .. Some ETL Processing
        # ..
        df.to_parquet("abfs://{CONTAINER_NAME}@{STORAGE_ACCOUNT_NAME}.dfs.core.windows.net/out_file.pq")
        return
    
    
    etl_job()
    
  • Using a SAS Token

    To use SAS Tokens, you need to install the bodo-azurefs-sas-token-provider package (it can be installed using conda install bodo-azurefs-sas-token-provider -c bodo.ai -c conda-forge). This is already installed on the Bodo Platform. Then in your program, do the following:

    import bodo
    # Importing this module adds the required JARs to the CLASSPATH
    import bodo_azurefs_sas_token_provider
    import pandas as pd
    import os
    
    # Initialize the temporary directory where the core-site file
    # will be written
    bodo.HDFS_CORE_SITE_LOC_DIR.initialize()
    
    
    # Define the core-site for your regular ADLS/HDFS read/write
    # operations
    CORE_SITE_SPEC = """<configuration>
        <property>
            <name>fs.azure.account.auth.type.{STORAGE_ACCOUNT_NAME}.dfs.core.windows.net</name>
            <value>SAS</value>
        </property>
        <property>
            <name>fs.azure.sas.token.provider.type.{STORAGE_ACCOUNT_NAME}.dfs.core.windows.net</name>
            <value>org.bodo.azurefs.sas.BodoSASTokenProvider</value>
        </property>
        <property>
            <name>fs.abfs.impl</name>
            <value>org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem</value>
        </property>
    </configuration>
    """
    
    # Write it to the temporary core-site file.
    # Do it on one rank on every node to avoid filesystem conflicts.
    if bodo.get_rank() in bodo.get_nodes_first_ranks():
        with open(bodo.HDFS_CORE_SITE_LOC, 'w') as f:
            f.write(CORE_SITE_SPEC)
    
    # Load the SAS token here.
    SAS_TOKEN = "..."
    
    # Write SAS Token to a file.
    # Do it on one rank on every node to avoid filesystem conflicts.
    if bodo.get_rank() in bodo.get_nodes_first_ranks():
        with open(os.path.join(bodo.HDFS_CORE_SITE_LOC_DIR.name, "sas_token.txt"), 'w') as f:
            f.write(SAS_TOKEN)
    
    # Run your ETL job
    @bodo.jit
    def etl_job():
        df = pd.read_parquet("abfs://{CONTAINER_NAME}@{STORAGE_ACCOUNT_NAME}.dfs.core.windows.net/file.pq")
        # ..
        # .. Some ETL Processing
        # ..
        df.to_parquet("abfs://{CONTAINER_NAME}@{STORAGE_ACCOUNT_NAME}.dfs.core.windows.net/out_file.pq")
        return
    
    
    etl_job()
    

Interleaving HDFS/ADLS I/O with Snowflake Write

Interleaving HDFS/ADLS I/O with Snowflake Write

For writing to an Azure based Snowflake account using the direct upload strategy (see Snowflake Write), Bodo writes the required core-site configuration to bodo.HDFS_CORE_SITE_LOC automatically. In cases where Snowflake write (to an Azure based Snowflake account) needs to be done in the same process as a regular HDFS/ADLS read/write operation, users need to write credentials for the regular HDFS/ADLS read/write operations to the same core-site location (bodo.HDFS_CORE_SITE_LOC) due to limitations of Arrow and HDFS. Bodo will modify the file (temporarily) during the Snowflake write operation(s) and then restore the original configuration for the regular HDFS/ADLS read/write operations.

Here is an example of how to do so:

import bodo
import pandas as pd

# Initialize the temporary directory where the core-site file
# will be written
bodo.HDFS_CORE_SITE_LOC_DIR.initialize()


# Define the core-site for your regular ADLS/HDFS read/write
# operations
CORE_SITE_SPEC = """<configuration>
...
</configuration>
"""

# Write it to the temporary core-site file
# Do it on one rank on every node to avoid filesystem conflicts.
if bodo.get_rank() in bodo.get_nodes_first_ranks():
    with open(bodo.HDFS_CORE_SITE_LOC, 'w') as f:
        f.write(CORE_SITE_SPEC)


# Write the SAS token if required.
# import bodo_azurefs_sas_token_provider
# SAS_TOKEN = "..."
# if bodo.get_rank() in bodo.get_nodes_first_ranks():
#     with open(os.path.join(bodo.HDFS_CORE_SITE_LOC_DIR.name, "sas_token.txt"), 'w') as f:
#         f.write(SAS_TOKEN)


@bodo.jit
def etl_job():
    df = pd.read_parquet("abfs://{CONTAINER_NAME}@{STORAGE_ACCOUNT_NAME}.dfs.core.windows.net/file.pq")
    # ..
    # .. Some ETL Processing
    # ..
    # Here Bodo will replace the core-site configuration to enable Snowflake write
    df.to_sql("new_table", "snowflake://user:password@url/{db_name}/schema?warehouse=warehouse_name&role=role_name")
    # Once done, Bodo will restore the contents of the original file so that your
    # ADLS operations happen as expected.
    # ..
    # .. Some more ETL Processing
    # ..
    df.to_parquet("abfs://{CONTAINER_NAME}@{STORAGE_ACCOUNT_NAME}.dfs.core.windows.net/out_file.pq")
    return


etl_job()

Databases

Currently, Bodo supports most RDBMS that work with SQLAlchemy, with a corresponding driver.

Snowflake

Prerequisites

In order to be able to query Snowflake or write a dataframe to Snowflake from Bodo, installing the Snowflake connector is necessary (it is installed by default on Bodo Platform). If you are using Bodo in a conda environment:

conda install -c conda-forge snowflake-connector-python

If you have installed Bodo using pip, then you can install the Snowflake connector using pip as well:

pip install snowflake-connector-python

Reading from Snowflake

To read a dataframe from a Snowflake database, users can use pd.read_sql with their Snowflake username and password: pd.read_sql(query, "snowflake://<username>:<password>@url").

Usage

Bodo requires the Snowflake connection string to be passed as an argument to the pd.read_sql function. The complete code looks as follows:

import bodo
import pandas as pd

@bodo.jit
def read_snowflake(db_name, table_name):
    df = pd.read_sql(
            f"SELECT * FROM {table_name}",
            f"snowflake://user:password@url/{db_name}/schema?warehouse=warehouse_name",
        )
    return df
df = read_snowflake(db_name, temp_table_name)
- _bodo_read_as_dict is a Bodo specific argument which forces the specified string columns to be read with dictionary-encoding. Bodo automatically loads string columns using dictionary encoding when it determines it would be beneficial based on a heuristic. Dictionary-encoding stores data in memory in an efficient manner and is most effective when the column has many repeated values. Read more about dictionary-encoded layout here. Bodo will raise a warning if the specified columns are not present in the schema or if they are not of type string.

For example:
```py
@bodo.jit()
def impl(query, conn):
    df = pd.read_sql(query, conn, _bodo_read_as_dict=["A", "B", "C"])
    return df
```

Writing to Snowflake

To write a dataframe to Snowflake using Bodo, users can use df.to_sql with their Snowflake username and password: df.to_sql(table_name, "snowflake://<username>:<password>@url").

Usage

Bodo requires the Snowflake connection string to be passed as an argument to the df.to_sql function. The complete code looks as follows:

import bodo
import pandas as pd


@bodo.jit(distributed=["df"])
def write_to_snowflake(df, table_name):
    df.to_sql(
        table_name,
        "snowflake://user:password@url/db_name/schema?warehouse=warehouse_name&role=role_name",
    )

write_to_snowflake(df, table_name)

Note

  • Writing Pandas Dataframe index to Snowflake is not supported. If index and/or index_label are provided, they will be ignored.
  • if_exists=append is needed if you want to append to a table that already exists in Snowflake.
  • chunksize argument is supported and used for writing parquet files to Snowflake stage in batches. It is the maximum number of rows to write to any of the intermediate parquet files. When not provided, Bodo will estimate and use a reasonable chunk size.
  • dtype and method arguments are not supported and will be ignored if provided.

Required Snowflake Permissions

Creating a new table

Creating a new table

To create a new table, the role being used must have the USAGE permission at the database level. In addition, the following permissions are required at the Schema level:

  • USAGE
  • CREATE TABLE
  • CREATE STAGE

Replacing an existing table

Replacing an existing table

To replace an existing table (i.e. when using if_exists='replace'), the role must be an owner of the table, in addition to having the permissions listed in the create section (at the Database and Schema level).

Appending to an existing table

Appending to an existing table

To append to an existing table (i.e. when using if_exists='append'), the role must have the INSERT permission at the table level, in addition to the permissions listed in the create section (at the Database and Schema level).

Verifying your role's permissions in Snowflake

Verifying your role's permissions in Snowflake

You can check the permissions granted to your role in Snowflake using the SHOW GRANTS command, e.g.:

show grants to role test_role

Alternatively, you can check the permissions at the database/schema/table level and verify that your role has the required permissions, e.g.:

-- Check that your role has the USAGE permission on the database
show grants on database test_db
-- Check that your role has the required permissions on the schema
show grants on schema test_schema
-- Check that your role has the required permissions on the table
show grants on table test_schema."TEST_TABLE"

You can also use the Snowsight UI to check these permissions by navigating to the Privileges section of the Details tab of the database/schema/table.

Advanced Configuration Options

Bodo provides highly performant distributed Snowflake write capability. This is done by writing parquet files to a Snowflake stage and then using Snowflake's COPY INTO to load the data into the Snowflake table. Based on the type of your Snowflake account (i.e. whether it is an AWS or Azure or GCP based account) and your environment (i.e. whether certain packages and modules are installed), Bodo will use one of two strategies to write to Snowflake: Direct Upload (preferred) or Put Upload.

  1. Direct Upload: In this strategy, Bodo creates a temporary stage and writes parquet files to it directly. Once these files are written, Bodo will automatically execute the COPY INTO command to copy the data into Snowflake. This is supported on S3 and ADLS based Snowflake stages (used by AWS and Azure based Snowflake accounts, respectively). Note that Bodo will drop the temporary stage once the data has been written. Temporary stages are also automatically cleaned up by Snowflake after the session ends.

    Note

    For writing to ADLS based stages, you must have Hadoop setup correctly (see more details here) and have the bodo-azurefs-sas-token-provider package installed (it can be installed using conda install bodo-azurefs-sas-token-provider -c bodo.ai -c conda-forge). Bodo will fall back to the Put Upload strategy if both these conditions are not met. Also see Interleaving HDFS/ADLS I/O with Snowflake Write.

    Note that this is only applicable to on-prem use cases since all of this is pre-configured on the Bodo Platform.

  2. Put Upload: In this strategy, Bodo creates a 'named' stage, writes parquet files to a temporary directory locally and then uses the Snowflake Python Connector to upload these files to this named stage using the PUT command. Similar to the Direct Upload strategy, once the files have been transferred, Bodo will automatically execute the COPY INTO command to copy the data into Snowflake. This is used for GCS based stages (used by GCP based Snowflake accounts), or when the user environment doesn't have all the required packages and modules to use the Direct Upload strategy.

    Similar to the Direct Upload strategy, Bodo will drop the named stage after the data has been written to the table.

    Note

    In some cases, e.g. during abnormal exits, Bodo may not be able to drop these stages, which may require manual cleanup by the user. All stages created by Bodo are of the form "bodo_io_snowflake_{random-uuid}". You can list all stages created by Bodo in Snowflake by executing the SHOW STAGES command:

    show stages like 'bodo_io_snowflake_%';
    

    and then delete them using the DROP STAGE command, e.g.:

    drop stage "bodo_io_snowflake_02ca9beb-eaf6-4957-a6ff-ff426441cd7a";
    

    These operations are not supported in Bodo and must be executed directly through the Snowflake console.

Direct Upload is the preferred strategy and used by default whenever possible. When this is not possible, Bodo will show a warning to the user. For optimal Snowflake ingestion performance, Bodo writes multiple small intermediate parquet files on each rank, instead of a single file like it does for regular parquet writes.

Users can set the environment variable BODO_SF_WRITE_DEBUG (to any value), to get more details during the Snowflake write process. This includes printing the raw result of the COPY INTO command execution, and printing more detailed error messages in case of exceptions.

Bodo exposes the following configuration variables for tuning the write process further (for advanced users only):

  • SF_WRITE_UPLOAD_USING_PUT: This configuration variable can be set to True to force Bodo to use the Put Upload strategy. This is False by default since Direct Upload is the preferred strategy.
  • SF_WRITE_PARQUET_COMPRESSION: This configuration variable can set to specify the compression method used for writing the intermediate Parquet files. Supported values include: "snappy", "gzip" and "zstd". Bodo uses "snappy" by default since it provided the best overall performance in our benchmarks, however this can vary based on your data.
  • SF_WRITE_PARQUET_CHUNK_SIZE: This configuration variable can be used to specify the chunk size to use when writing the dataframe to the intermediate Parquet files. This is measured by the uncompressed memory usage of the dataframe (in bytes). Note that when provided, df.to_sql's chunksize parameter (see description in note in the Usage section) overrides this value.
  • SF_WRITE_OVERLAP_UPLOAD: For maximum performance, the Put Upload strategy writes intermediate parquet files to local disk and uploads them to the stage in parallel. To alter this behavior, i.e. write the parquet files and upload them sequentially, this configuration variable can be set to False. Note that this is only applicable when using the Put Upload strategy.

These can be set as follows:

import bodo
import bodo.io.snowflake

bodo.io.snowflake.SF_WRITE_UPLOAD_USING_PUT = False
bodo.io.snowflake.SF_WRITE_PARQUET_COMPRESSION = "gzip"
...

MySQL

Prerequisites

In addition to sqlalchemy, installing pymysql is required. If you are using Bodo in a conda environment:

conda install pymysql -c conda-forge

If you have installed Bodo using pip:

pip install PyMySQL

Usage

Reading result of a SQL query in a dataframe:

import bodo
import pandas as pd


@bodo.jit(distributed=["df"])
def read_mysql(table_name, conn):
    df = pd.read_sql(
            f"SELECT * FROM {table_name}",
            conn
        )
    return df


table_name = "test_table"
conn = f"mysql+pymysql://{username}:{password}@{host}/{db_name}"
df = read_mysql(table_name, conn)

Writing dataframe as a table in the database:

import bodo
import pandas as pd


@bodo.jit(distributed=["df"])
def write_mysql(df, table_name, conn):
    df.to_sql(table, conn)


table_name = "test_table"
df = pd.DataFrame({"A": [1.12, 1.1] * 5, "B": [213, -7] * 5})
conn = f"mysql+pymysql://{username}:{password}@{host}/{db_name}"
write_mysql(df, table_name, conn)

Oracle Database

Prerequisites

In addition to sqlalchemy, install cx_oracle and Oracle instant client driver. If you are using Bodo in a conda environment:

conda install cx_oracle -c conda-forge

If you have installed Bodo using pip:

pip install cx-Oracle
  • Then, Download "Basic" or "Basic light" package matching your operating system from here.
  • Unzip package and add it to LD_LIBRARY_PATH environment variable.

Note

For linux libaio package is required as well.

  • conda: conda install libaio -c conda-forge
  • pip: pip install libaio

See cx_oracle for more information. Alternatively, Oracle instant driver can be automatically downloaded using wget or curl commands. Here's an example of automatic installation on a Linux OS machine.

conda install cx_oracle libaio -c conda-forge
mkdir -p /opt/oracle
cd /opt/oracle
wget https://download.oracle.com/otn_software/linux/instantclient/215000/instantclient-basic-linux.x64-21.5.0.0.0dbru.zip
unzip instantclient-basic-linux.x64-21.5.0.0.0dbru.zip
export LD_LIBRARY_PATH=/opt/oracle/instantclient_21_5:$LD_LIBRARY_PATH

Usage

Reading result of a SQL query in a dataframe:

import bodo
import pandas as pd


@bodo.jit(distributed=["df"])
def read_oracle(table_name, conn):
    df = pd.read_sql(
            f"SELECT * FROM {table_name}",
            conn
        )
    return df


table_name = "test_table"
conn = f"oracle+cx_oracle://{username}:{password}@{host}/{db_name}"
df = read_oracle(table_name, conn)

Writing dataframe as a table in the database:

import bodo
import pandas as pd


@bodo.jit(distributed=["df"])
def write_mysql(df, table_name, conn):
    df.to_sql(table, conn)


table_name = "test_table"
df = pd.DataFrame({"A": [1.12, 1.1] * 5, "B": [213, -7] * 5})
conn = f"oracle+cx_oracle://{username}:{password}@{host}/{db_name}"
write_mysql(df, table_name, conn)

PostgreSQL

Prerequisites

In addition to sqlalchemy, install psycopg2.

If you are using Bodo in a conda environment:

conda install psycopg2 -c conda-forge

If you have installed Bodo using pip:

$ pip install psycopg2

Usage

Reading result of a SQL query in a dataframe:

import bodo
import pandas as pd


@bodo.jit(distributed=["df"])
def read_postgresql(table_name, conn):
    df = pd.read_sql(
            f"SELECT * FROM {table_name}",
            conn
        )
    return df


table_name = "test_table"
conn = f"postgresql+psycopg2://{username}:{password}@{host}/{db_name}"
df = read_postgresql(table_name, conn)

Writing dataframe as a table in the database:

import bodo
import pandas as pd


@bodo.jit(distributed=["df"])
def write_postgresql(df, table_name, conn):
    df.to_sql(table, conn)


table_name = "test_table"
df = pd.DataFrame({"A": [1.12, 1.1] * 5, "B": [213, -7] * 5})
conn = f"postgresql+psycopg2://{username}:{password}@{host}/{db_name}"
write_postgresql(df, table_name, conn)

Specifying I/O Data Types Manually

In some rase use cases, the dataset path cannot be a constant value or a JIT function argument. In such cases, the path is determined dynamically, which does not allow automatic Bodo data type inference. Therefore, the user has to provide the data types manually. For example, names and dtypes keyword arguments of pd.read_csv and pd.read_excel allow the user to specify the data types:

@bodo.jit
def example_csv(fname1, fname2, flag):
    if flag:
        file_name = fname1
    else:
        file_name = fname2
    return pd.read_csv(file_name, names = ["A", "B", "C"], dtype={"A": int, "B": float, "C": str})

For other pandas read functions, the existing APIs do not currently allow this information to be provided. Users can still provide typing information in the bodo.jit decorator, similar to Numba's typing syntax. For example:

@bodo.jit(locals={"df":{"one": bodo.float64[:],
                  "two": bodo.string_array_type,
                  "three": bodo.bool_[:],
                  "four": bodo.float64[:],
                  "five": bodo.string_array_type,
                  }})
def example_df_schema(fname1, fname2, flag):
    if flag:
        file_name = fname1
    else:
        file_name = fname2
    df = pd.read_parquet(file_name)
    return df


 @bodo.jit(locals={"X": bodo.float64[:,:], "Y": bodo.float64[:]})
 def example_h5(fname1, fname2, flag):
    if flag:
        file_name = fname1
    else:
        file_name = fname2
     f = h5py.File(file_name, "r")
     X = f["points"][:]
     Y = f["responses"][:]

For the complete list of supported types, please see the pandas dtype section. In the event that the dtypes are improperly specified, Bodo will throw a runtime error.

Warning

Providing data types manually is error-prone and should be avoided as much as possible.