Scalable Data I/O¶
Efficient parallel data processing requires data I/O to be parallelized effectively as well. Bodo provides parallel file I/O for many different formats such as Parquet, CSV, JSON, Numpy binaries, HDF5 and SQL databases. This diagram demonstrates how chunks of data are partitioned among parallel execution engines by Bodo.
Bodo automatically parallelizes I/O for any number of cores and cluster size without any additional API layers.
I/O Workflow¶
Make sure I/O calls for large data are inside JIT functions to allow Bodo to parallelize I/O and divide the data across cores automatically (see below for supported formats and APIs).
Warning
Performing I/O in regular Python (outside JIT functions) replicates data on all Python processes, which can result in out-of-memory errors if the data is large. For example, a 1 GB dataframe replicated on 1000 cores consumes 1 TB of memory.
Bodo looks at the schema of the input dataset during compilation time to infer
the datatype of the resulting dataframe.
This requires the dataset path to be available to the compiler. The path should be either a constant string value, an argument
to the JIT function, or a simple combination of the two.
For example, the following code passes the dataset path as an argument, allowing Bodo to infer the data type of df
:
import os
import pandas as pd
import bodo
data_path = os.environ["JOB_DATA_PATH"]
@bodo.jit
def f(path):
df = pd.read_parquet(path)
print(df.A.sum())
f(data_path)
Concatenating arguments and constant values also works:
import os
import pandas as pd
import bodo
data_root = os.environ["JOB_DATA_ROOT"]
@bodo.jit
def f(root):
df = pd.read_parquet(root + "/table1.pq")
print(df.A.sum())
f(data_root)
In the rare case that the path should be a dynamic value inside JIT functions, the data types have to be specified manually (see Specifying I/O Data Types Manually). This is error-prone and should be avoided as much as possible.
Supported Data Formats¶
Currently, Bodo supports I/O for Parquet, CSV, SQL, JSON, HDF5, and Numpy binaries formats. It can read these formats from multiple filesystems, including S3, HDFS and Azure Data Lake (ADLS) (see File Systems below for more information). Many databases and data warehouses such as Snowflake are supported as well.
Also see supported pandas APIs for supported arguments of I/O functions.
Parquet¶
Parquet is a commonly used file format in analytics due to its efficient
columnar storage. Bodo supports the standard pandas API for reading
Parquet: pd.read_parquet(path)
, where path can be a parquet file, a
directory with multiple parquet files (all are part of the same
dataframe), a glob pattern, list of files or list of glob patterns:
import pandas as pd
import bodo
@bodo.jit
def write_pq(df):
df.to_parquet("s3://bucket-name/example.pq")
@bodo.jit
def read_pq():
df = pd.read_parquet("s3://bucket-name/example.pq")
return df
Note
Bodo reads datasets in parallel using multiple cores while ensuring that the number of rows read on all cores is roughly equal. The size and number of row groups can affect parallel read performance significantly. Currently, reading any number of rows in Bodo requires reading at least one row-group. To read even a single row from a parquet dataset, the entire row-group containing that row (and its corresponding metadata) needs to be read first, and then the required row is extracted from it. Therefore, for best parallel read performance, there must be sufficient row-groups to minimize the number of instances where multiple cores need to read from the same row group. This means there must be at least as many row groups as the number of cores, but ideally a lot more. At the same time, the size of the row-groups should not be too small since this can lead to overheads. For more details about parquet file format, refer to the format specification.
to_parquet(name)
with distributed data writes to a folder called
name
. Each process writes one file into the folder, but if the data is
not distributed, to_parquet(name)
writes to a single file called
name
:
df = pd.DataFrame({"A": range(10)})
@bodo.jit
def example1_pq(df):
df.to_parquet("example1.pq")
@bodo.jit(distributed={"df"})
def example2_pq(df):
df.to_parquet("example2.pq")
if bodo.get_rank() == 0:
example1_pq(df)
example2_pq(df)
Run the code above with 4 processors:
example1_pq(df)
writes 1 single file, and example2_pq(df)
writes a
folder containing 4 parquet files:
.
├── example1.pq
├── example2.pq
│  ├── part-00.parquet
│  ├── part-01.parquet
│  ├── part-02.parquet
│  └── part-03.parquet
See read_parquet()
, to_parquet()
for supported arguments.
Filter Pushdown and Column Pruning¶
Filter Pushdown and Column Pruning
Bodo can detect filters used by the code and optimize the read_parquet
call by pushing the filters down to the storage layer, so that only the
rows required by the program are read.
In addition, Bodo only reads the columns that are used in the program,
and prunes the unused columns.
These optimizations can significantly speed up
I/O in many cases and can substantially reduce the program's memory footprint.
For example, suppose we have a large dataset with many columns that spans many years, and we only need to read revenue data for a particular year:
@bodo.jit
def query():
df = pd.read_parquet("s3://my-bucket/data.pq")
df = df[df["year"] == 2021]
return df.groupby("customer_key")["revenue"].max()
When compiling the above, Bodo detects the df[df["year"] == 2021]
filter and optimizes the read_parquet
call so that it only reads data
for year 2021 from S3.
This requires the dataframe filtering operation to be in the same JIT function
as read_parquet
, and the dataframe variable shouldn't be used before filtering.
Bodo also makes sure only customer_key
and revenue
columns are read since
other columns are not used in the programs.
In general, if the dataset is hive-partitioned and partition columns appear in
filter expressions, only the files that contain relevant data are read,
and the rest are discarded based on their path. For example, if year
is a partition column above and we have a dataset:
.
└── data.pq/
│ ...
├───year=2020/
│ ├── part-00.parquet
│ └── part-01.parquet
└───year=2021/
├── part-02.parquet
└── part-03.parquet
Bodo will only read the files in the year=2021
directory.
For non-partition columns, Bodo may discard files entirely just by looking at their parquet metadata (depending on the filters and statistics contained in the metadata) or filter the rows during read.
Note
Filter pushdown is often a very important optimization and critical for having manageable memory footprint in big data workloads. Make sure filtering happens in the same JIT function right after dataset read (or JIT functions for I/O are inlined, see inlining).
Exploring Large Data Without Full Read¶
Exploring Large Data Without Full Read
Exploring large datasets often requires seeing its shape and a sample of the data. Bodo is able to provide this information quickly without loading the full Parquet dataset, which means there is no need for a large cluster with a lot of memory. For example:
@bodo.jit
def head_only_read():
df = pd.read_parquet("s3://my-bucket/example.pq")
print(df.shape)
print(df.head())
In this example, Bodo provides the shape information for the full
dataset in df.shape
, but only loads the first few rows that are
necessary for df.head()
.
CSV¶
CSV is a common text format for data exchange. Bodo supports most of the standard pandas API to read CSV files:
import pandas as pd
import bodo
@bodo.jit
def write_csv(df):
df.to_csv("s3://my-bucket/example.csv")
@bodo.jit
def read_csv():
df = pd.read_csv("s3://my-bucket/example.csv")
return df
Unlike read_csv
in regular pandas, Bodo can read a directory that
contains multiple partitioned CSV files as well. All files in the folder
must have the same number and datatype of columns. They can have
different number of rows.
Usage:
@bodo.jit
def read_csv_folder():
df = pd.read_csv("s3://my-bucket/path/to/folder/foldername")
doSomething(df)
Use sep="n"
to read text files line by line into a single-column
dataframe (without creating separate columns, useful when text data is
unstructured or there are too many columns to read efficiently):
@bodo.jit
def read_test():
df = pd.read_csv("example.csv", sep="n", names=["value"], dtype={"value": "str"})
return df
Note
Bodo uses nullable integer types of pandas to ensure type stability (see
integer NA issue in pandas for more details). Therefore, data types must be specified
explicitly for accurate performance comparisons of Bodo and pandas for
read_csv
.
to_csv(name)
has different behaviors for different file systems:
-
POSIX file systems: always writes to a single file, regardless of the number of processes and whether the data is distributed, but writing is still done in parallel when more than 1 processor is used:
df = pd.DataFrame({"A": np.arange(n)}) @bodo.jit def example1_csv(df): df.to_csv("example1.csv") @bodo.jit(distributed={"df"}) def example2_csv(df): df.to_csv("example2.csv") if bodo.get_rank() == 0: example1_csv(df) example2_csv(df)
Run the code above with 4 processors:
each
example1_csv(df)
andexample2_csv(df)
writes to a single file: -
S3 and HDFS: distributed data is written to a folder called
name
. Each process writes one file into the folder, but if the data is not distributed,to_csv(name)
writes to a single file calledname
:df = pd.DataFrame({"A": np.arange(n)}) @bodo.jit def example1_csv(df): df.to_csv("s3://bucket-name/example1.csv") @bodo.jit(distributed={"df"}) def example2_csv(df): df.to_csv("s3://bucket-name/example2.csv") if bodo.get_rank() == 0: example1_csv(df) example2_csv(df)
Run the code above with 4 processors:
example1_csv(df)
writes 1 single file, andexample2_csv(df)
writes a folder containing 4 csv files:
See read_csv()
, to_csv()
for supported arguments.
JSON¶
For JSON, the syntax is also the same as pandas.
Usage:
@bodo.jit
def example_write_json(df, fname):
df.to_json(fname)
@bodo.jit
def example_read_json_lines_format():
df = pd.read_json("example.json", orient = "records", lines = True)
@bodo.jit
def example_read_json_multi_lines():
df = pd.read_json("example_file.json", orient = "records", lines = False,
dtype={"A": float, "B": "bool", "C": int})
Note
- The
dtype
argument is required when reading a regular multi-line JSON file. - Bodo cannot read a directory containing multiple multi-line JSON files
- Bodo's default values for
orient
andlines
arerecords
andFalse
respectively.
to_json(name)
has different behaviors for different file systems:
-
POSIX file systems:
to_json(name)
behavior depends onorient
andlines
arguments.-
DataFrame.to_json(name, orient="records", lines=True)
(i.e. writing JSON Lines text file format) always writes to a single file, regardless of the number of processes and whether the data is distributed, but writing is still done in parallel when more than 1 processor is used:df = pd.DataFrame({"A": np.arange(n)}) @bodo.jit def example1_json(df): df.to_json("example1.json", orient="records", lines=True) @bodo.jit(distributed={"df"}) def example2_json(df): df.to_json("example2.json", orient="records", lines=True) if bodo.get_rank() == 0: example1_json(df) example2_jsons(df)
Run the code above with 4 processors:
each
example1_json(df)
andexample2_json(df)
writes to a single file: -
All other combinations of values for
orient
andlines
have the same behavior as S3 and HDFS explained below.
-
-
S3 and HDFS : distributed data is written to a folder called
name
. Each process writes one file into the folder, but if the data is not distributed,to_json(name)
writes to a file calledname
:df = pd.DataFrame({"A": np.arange(n)}) @bodo.jit def example1_json(df): df.to_json("s3://bucket-name/example1.json") @bodo.jit(distributed={"df"}) def example2_json(df): df.to_json("s3://bucket-name/example2.json") if bodo.get_rank() == 0: example1_json(df) example2_json(df)
Run the code above with 4 processors:
example1_json(df)
writes 1 single file, andexample2_json(df)
writes a folder containing 4 json files:. ├── example1.json ├── example2.json │  ├── part-00.json │  ├── part-01.json │  ├── part-02.json │  └── part-03.json
See
read_json()][pandas-f-in], [
to_json()` for supported arguments.
SQL¶
See Databases for the list of supported Relational Database Management Systems (RDBMS) with Bodo.
For SQL, the syntax is also the same as pandas. For reading:
@bodo.jit
def example_read_sql():
df = pd.read_sql("select * from employees", "mysql+pymysql://<username>:<password>@<host>/<db_name>")
See read_sql()
for supported arguments.
For writing:
@bodo.jit
def example_write_sql(df):
df.to_sql("table_name", "mysql+pymysql://<username>:<password>@<host>/<db_name>")
See to_sql()
for supported arguments.
Note
sqlalchemy
must be installed in order to use pandas.read_sql
.
Filter Pushdown and Column Pruning¶
Filter Pushdown and Column Pruning
Similar to Parquet read, Bodo JIT compiler is able to push down filters to the data source and prune unused columns automatically. For example, this program reads data from a very large Snowflake table, but only needs limited rows and columns:
@bodo.jit
def filter_ex(conn, int_val):
df = pd.read_sql("SELECT * FROM LINEITEM", conn)
df = df[(df["l_orderkey"] > 10) & (int_val >= df["l_linenumber"])]
result = df["l_suppkey"]
print(result)
filter_ex(conn, 2)
Bodo optimizes the query passed to read_sql
to push filters down and
prune unused columns. In this case, Bodo will replace SELECT * FROM LINEITEM
with
the optimized version automatically:
SELECT "L_SUPPKEY" FROM (SELECT * FROM LINEITEM) as TEMP
WHERE ( ( l_orderkey > 10 ) AND ( l_linenumber <= 2 ) )
Delta Lake¶
Reading parquet files from Delta Lake is supported locally, from S3, and from Azure ADLS.
- The Delta Lake binding python packaged needs to be installed using pip:
pip install deltalake
. - For S3, the
AWS_DEFAULT_REGION
environment variable should be set to the region of the bucket hosting the Delta Lake table. - For ADLS, the
AZURE_STORAGE_ACCOUNT
andAZURE_STORAGE_KEY
environment variables need to be set.
Example code for reading:
Note
Writing is currently not supported.
Iceberg¶
Bodo's support for Iceberg Tables is under active development and currently only supports basic read and write functionality.
Bodo supports reading Iceberg tables stored in a directory on HDFS, either locally or from S3,
through Pandas' read_sql_table
API.
- Bodo's Iceberg Connector python package needs to be installed using conda:
conda install bodo-iceberg-connector -c bodo.ai -c conda-forge
. - For tables on S3, the credentials should be set either using environment variables,
or AWS configuration in
~/.aws
or using an instance profile on the EC2 instance.
Iceberg connection strings vary by catalog, but in general are of the form iceberg<+conn>://<path><?params>
where
- <conn>://<path>
is the location of the catalog or Iceberg warehouse
- params
is a list of properties to pass to the catalog. Each parameter must be of the form <key>=<value>
and separated with &
, similar to HTTP URLs.
The following parameters are officially supported:
- type
: Type of catalog. The supported values are listed below. When the connection string is ambiguous, this parameter is used to determine the type of catalog implementation.
- warehouse
: Location of the warehouse. Required when creating a new table using Glue, Nessie, or Hive catalog.
The following catalogs are supported:
-
Hadoop Catalog on Local Filesystem:
- Used when
type=hadoop
is specified or when<conn>
isfile
or empty <path>
is the absolute path to the warehouse (directory containing the database schema)- Parameter
warehouse
will be ignored if specified - E.g.
iceberg://<ABSOLUTE PATH TO ICEBERG WAREHOUSE>
oriceberg+file://<ABSOLUTE PATH TO ICEBERG WAREHOUSE>
- Used when
-
Hadoop Catalog on S3
- Used when
type=hadoop-s3
is specified or when<conn>
iss3
<conn>://<path>
is the S3 path to the warehouse (directory or bucket containing the database schema)- Parameter
warehouse
will be ignored if specified - E.g.
iceberg+s3://<S3 PATH TO ICEBERG WAREHOUSE>
- Used when
-
Dremio Arctic or Nessie Catalog
- Must specify
type=nessie
as a parameter to use this warehouse. <conn>://<path>
is the URL to the Nessie catalog, which can be found on Dremio's dashboard.- It will look like the following:
https://nessie.dremio.cloud/v1/projects/<PROJECT ID>
<PROJECT ID>
is the Nessie project UUID
- It will look like the following:
- The following parameters are required:
authentication.type=BEARER
authentication.token=<AUTH TOKEN>
where<AUTH TOKEN>
is your personal Dremio authentication token and can be found on the dashboard
- Parameter
warehouse
is required to create a table - E.g.
iceberg+https://nessie.dremio.cloud/v1/projects/<PROJECT ID>?type=nessie&authentication.type=BEARER&authentication.token=<AUTH TOKEN>
- Must specify
-
AWS Glue Catalog
- Connection string must be of the form
iceberg+glue?<params>
- Parameter
type
will be ignored if specified - Parameter
warehouse
is required to create a table - E.g.
iceberg+glue
oriceberg+glue?warehouse=s3://<ICEBERG-BUCKET>
- Connection string must be of the form
-
Hive / Thrift Catalog
- Used when
type=hive
is specified or when<conn>
isthrift
<conn>://<path>
is the URL to the Thrift catalog, i.e.thrift://localhost:9083
- Parameter
warehouse
is required to create the table - E.g.
iceberg+thrift://<THRIFT URL>
- Used when
Example code for reading:
@bodo.jit
def example_read_iceberg():
df = pd.read_sql_table(
table_name="<NAME OF ICEBERG TABLE",
con="<SEE PREVIOUS SECTION ON HOW TO FORMAT THIS FOR DIFFERENT CATALOGS>",
schema="<NAME OF ICEBERG DATABASE SCHEMA>"
)
Note
-
schema
argument is required for reading Iceberg tables. -
The Iceberg table to read should be located at
<warehouse-location>/<schema>/<table_name>
, whereschema
andtable_name
are the arguments topd.read_sql_table
, andwarehouse-location
is inferred from the connection string based on the description provided above.
Warning
- Tables with delete files or those that have gone through schema evolution are not supported yet.
Bodo has basic support for writing Iceberg tables from Pandas Dataframes using the to_sql
API, including
support for appending to tables with an existing partition spec
and/or sort order.
Example code for writing:
@bodo.jit(distributed=["df"])
def write_iceberg_table(df: pandas.DataFrame):
df.to_sql(
name="<NAME OF ICEBERG TABLE",
con="<SEE PREVIOUS SECTION ON HOW TO FORMAT THIS FOR DIFFERENT CATALOGS>",
schema="<NAME OF ICEBERG DATABASE SCHEMA>",
if_exists="replace"
)
Note
schema
argument is required for reading Iceberg tables.- Writing Pandas Dataframe index to an Iceberg table is not supported. If
index
andindex_label
are provided, they will be ignored. chunksize
,dtype
andmethod
arguments are not supported and will be ignored if provided.- While Bodo can append to tables with an existing partition spec and/or sort order, it does not support creating new tables with a Partition Spec or Sort Order.
Numpy binaries¶
Numpy's fromfile
and tofile
are supported as below:
@bodo.jit
def example_np_io():
A = np.fromfile("myfile.dat", np.float64)
...
A.tofile("newfile.dat")
Bodo has the same behavior as Numpy for numpy.ndarray.tofile()
, where
we always write to a single file. However, writing distributed data to
POSIX is done in parallel, but writing to S3 & HDFS is done sequentially
(due to file system limitations).
HDF5¶
HDF5 is a common format in scientific computing, especially for multi-dimensional numerical data. HDF5 can be very efficient at scale, since it has native parallel I/O support. For HDF5, the syntax is the same as the h5py package. For example:
@bodo.jit
def example_h5():
f = h5py.File("data.hdf5", "r")
X = f["points"][:]
Y = f["responses"][:]
File Systems¶
Amazon S3¶
Reading and writing CSV, Parquet, JSON, and Numpy binary files from and to Amazon S3 is supported.
The file path should start with s3://
:
These environment variables are used for File I/O with S3 credentials:
AWS_ACCESS_KEY_ID
AWS_SECRET_ACCESS_KEY
AWS_DEFAULT_REGION
: default asus-east-1
AWS_S3_ENDPOINT
: specify custom host name, default as AWS endpoint(s3.amazonaws.com
)
Connecting to S3 endpoints through a proxy is supported. The proxy URI can be provided by setting one of the following environment variables (listed in order of precedence):
http_proxy
https_proxy
HTTP_PROXY
HTTPS_PROXY
By default, Bodo uses Apache Arrow internally for read
and write of data on S3. If additional storage_options
are provided to pd.read_parquet
that Arrow does not support, then S3FS will be used instead. It can be optionally installed via:
Google Cloud Storage¶
Reading and writing Parquet files from and to Google Cloud is supported.
The file path should start with gs://
or gcs://
:
GOOGLE_APPLICATION_CREDENTIALS
Details for GOOGLE_APPLICATION_CREDENTIALS
can be seen in the Google
docs here.
Bodo uses the fsspec-based gcsfs library internally for read and write of data on GCS.
Hadoop Distributed File System (HDFS) and Azure Data Lake Storage (ADLS) Gen2¶
Reading and writing CSV, Parquet, JSON, and Numpy binary files from and to Hadoop Distributed File System (HDFS) is supported. Note that Azure Data Lake Storage Gen2 can be accessed through HDFS.
The openjdk
version 11 package must be available, and the file path
should start with hdfs://
or abfs[s]://
:
These environment variables are used for File I/O with HDFS:
HADOOP_HOME
: the root of your installed Hadoop distribution. Often islib/native/libhdfs.so
.ARROW_LIBHDFS_DIR
: location of libhdfs. Often is$HADOOP_HOME/lib/native
.CLASSPATH
: must contain the Hadoop jars. You can set these using:
Bodo uses Apache Arrow internally for read
and write of data on HDFS. $HADOOP_HOME/etc/hadoop/hdfs-site.xml
provides default behaviors for the HDFS client used by Bodo.
Inconsistent configurations (e.g. dfs.replication
) could potentially
cause errors in Bodo programs.
Setting up HDFS/ADLS Credentials¶
Setting up HDFS/ADLS Credentials
Hadoop Filesystem sources its credentials from the first available
core-site.xml
file on the CLASSPATH
. When Hadoop is set up (including
on Bodo Platform), this file is usually created at
$HADOOP_HOME/etc/hadoop/core-site.xml
automatically.
You can edit this file and set credentials appropriately.
You can also write the core-site configuration to bodo.HDFS_CORE_SITE_LOC
,
which is a temporary file Bodo adds to the CLASSPATH
when it is initialized:
import bodo
# Initialize the temporary directory where the core-site file
# will be written
bodo.HDFS_CORE_SITE_LOC_DIR.initialize()
# Define the core-site for your regular ADLS/HDFS read/write
# operations
CORE_SITE_SPEC = """
<configuration>
...
</configuration>
"""
# Write it to the temporary core-site file.
# Do it on one rank on every node to avoid filesystem conflicts.
if bodo.get_rank() in bodo.get_nodes_first_ranks():
with open(bodo.HDFS_CORE_SITE_LOC, 'w') as f:
f.write(CORE_SITE_SPEC)
@bodo.jit
def etl_job():
df = pd.read_parquet("abfs://{CONTAINER_NAME}@{STORAGE_ACCOUNT_NAME}.dfs.core.windows.net/file.pq")
# ..
# .. Some ETL Processing
# ..
df.to_parquet("abfs://{CONTAINER_NAME}@{STORAGE_ACCOUNT_NAME}.dfs.core.windows.net/out_file.pq")
return
etl_job()
There are several authorization methods for reading from or writing to ADLS Storage Containers, all of which require slightly different core-site configurations. Here are some examples:
-
Using an ADLS Access Key
import bodo import pandas as pd # Initialize the temporary directory where the core-site file # will be written bodo.HDFS_CORE_SITE_LOC_DIR.initialize() # Define the core-site for your regular ADLS/HDFS read/write # operations CORE_SITE_SPEC = """<configuration> <property> <name>fs.abfs.impl</name> <value>org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem</value> </property> <property> <name>fs.azure.account.auth.type.{STORAGE_ACCOUNT_NAME}.dfs.core.windows.net</name> <value>SharedKey</value> </property> <property> <name>fs.azure.account.key.{STORAGE_ACCOUNT_NAME}.dfs.core.windows.net</name> <value>{ADLS_SECRET}</value> <description> The ADLS storage account access key itself.</description> </property> </configuration> """ # Write it to the temporary core-site file. # Do it on one rank on every node to avoid filesystem conflicts. if bodo.get_rank() in bodo.get_nodes_first_ranks(): with open(bodo.HDFS_CORE_SITE_LOC, 'w') as f: f.write(CORE_SITE_SPEC) # Run your ETL job @bodo.jit def etl_job(): df = pd.read_parquet("abfs://{CONTAINER_NAME}@{STORAGE_ACCOUNT_NAME}.dfs.core.windows.net/file.pq") # .. # .. Some ETL Processing # .. df.to_parquet("abfs://{CONTAINER_NAME}@{STORAGE_ACCOUNT_NAME}.dfs.core.windows.net/out_file.pq") return etl_job()
-
Using a SAS Token
To use SAS Tokens, you need to install the
bodo-azurefs-sas-token-provider
package (it can be installed usingconda install bodo-azurefs-sas-token-provider -c bodo.ai -c conda-forge
). This is already installed on the Bodo Platform. Then in your program, do the following:import bodo # Importing this module adds the required JARs to the CLASSPATH import bodo_azurefs_sas_token_provider import pandas as pd import os # Initialize the temporary directory where the core-site file # will be written bodo.HDFS_CORE_SITE_LOC_DIR.initialize() # Define the core-site for your regular ADLS/HDFS read/write # operations CORE_SITE_SPEC = """<configuration> <property> <name>fs.azure.account.auth.type.{STORAGE_ACCOUNT_NAME}.dfs.core.windows.net</name> <value>SAS</value> </property> <property> <name>fs.azure.sas.token.provider.type.{STORAGE_ACCOUNT_NAME}.dfs.core.windows.net</name> <value>org.bodo.azurefs.sas.BodoSASTokenProvider</value> </property> <property> <name>fs.abfs.impl</name> <value>org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem</value> </property> </configuration> """ # Write it to the temporary core-site file. # Do it on one rank on every node to avoid filesystem conflicts. if bodo.get_rank() in bodo.get_nodes_first_ranks(): with open(bodo.HDFS_CORE_SITE_LOC, 'w') as f: f.write(CORE_SITE_SPEC) # Load the SAS token here. SAS_TOKEN = "..." # Write SAS Token to a file. # Do it on one rank on every node to avoid filesystem conflicts. if bodo.get_rank() in bodo.get_nodes_first_ranks(): with open(os.path.join(bodo.HDFS_CORE_SITE_LOC_DIR.name, "sas_token.txt"), 'w') as f: f.write(SAS_TOKEN) # Run your ETL job @bodo.jit def etl_job(): df = pd.read_parquet("abfs://{CONTAINER_NAME}@{STORAGE_ACCOUNT_NAME}.dfs.core.windows.net/file.pq") # .. # .. Some ETL Processing # .. df.to_parquet("abfs://{CONTAINER_NAME}@{STORAGE_ACCOUNT_NAME}.dfs.core.windows.net/out_file.pq") return etl_job()
Interleaving HDFS/ADLS I/O with Snowflake Write¶
Interleaving HDFS/ADLS I/O with Snowflake Write
For writing to an Azure based Snowflake account using the direct upload strategy
(see Snowflake Write), Bodo writes the required core-site
configuration to bodo.HDFS_CORE_SITE_LOC
automatically. In cases where Snowflake
write (to an Azure based Snowflake account) needs to be
done in the same process as a regular HDFS/ADLS read/write operation,
users need to write credentials for the regular HDFS/ADLS read/write operations
to the same core-site location (bodo.HDFS_CORE_SITE_LOC
)
due to limitations of Arrow and HDFS.
Bodo will modify the file (temporarily) during the Snowflake write operation(s)
and then restore the original configuration for the regular HDFS/ADLS
read/write operations.
Here is an example of how to do so:
import bodo
import pandas as pd
# Initialize the temporary directory where the core-site file
# will be written
bodo.HDFS_CORE_SITE_LOC_DIR.initialize()
# Define the core-site for your regular ADLS/HDFS read/write
# operations
CORE_SITE_SPEC = """<configuration>
...
</configuration>
"""
# Write it to the temporary core-site file
# Do it on one rank on every node to avoid filesystem conflicts.
if bodo.get_rank() in bodo.get_nodes_first_ranks():
with open(bodo.HDFS_CORE_SITE_LOC, 'w') as f:
f.write(CORE_SITE_SPEC)
# Write the SAS token if required.
# import bodo_azurefs_sas_token_provider
# SAS_TOKEN = "..."
# if bodo.get_rank() in bodo.get_nodes_first_ranks():
# with open(os.path.join(bodo.HDFS_CORE_SITE_LOC_DIR.name, "sas_token.txt"), 'w') as f:
# f.write(SAS_TOKEN)
@bodo.jit
def etl_job():
df = pd.read_parquet("abfs://{CONTAINER_NAME}@{STORAGE_ACCOUNT_NAME}.dfs.core.windows.net/file.pq")
# ..
# .. Some ETL Processing
# ..
# Here Bodo will replace the core-site configuration to enable Snowflake write
df.to_sql("new_table", "snowflake://user:password@url/{db_name}/schema?warehouse=warehouse_name&role=role_name")
# Once done, Bodo will restore the contents of the original file so that your
# ADLS operations happen as expected.
# ..
# .. Some more ETL Processing
# ..
df.to_parquet("abfs://{CONTAINER_NAME}@{STORAGE_ACCOUNT_NAME}.dfs.core.windows.net/out_file.pq")
return
etl_job()
Databases¶
Currently, Bodo supports most RDBMS that work with SQLAlchemy, with a corresponding driver.
Snowflake¶
Prerequisites¶
In order to be able to query Snowflake or write a dataframe to Snowflake from Bodo, installing the Snowflake connector is necessary (it is installed by default on Bodo Platform). If you are using Bodo in a conda environment:
If you have installed Bodo using pip, then you can install the Snowflake connector using pip as well:
Reading from Snowflake¶
To read a dataframe from a Snowflake database, users can use
pd.read_sql
with their Snowflake username and password:
pd.read_sql(query, "snowflake://<username>:<password>@url")
.
Usage¶
Bodo requires the Snowflake connection string to be passed as an
argument to the pd.read_sql
function. The complete code looks as
follows:
import bodo
import pandas as pd
@bodo.jit
def read_snowflake(db_name, table_name):
df = pd.read_sql(
f"SELECT * FROM {table_name}",
f"snowflake://user:password@url/{db_name}/schema?warehouse=warehouse_name",
)
return df
df = read_snowflake(db_name, temp_table_name)
_bodo_read_as_dict
is a Bodo specific argument which forces
the specified string columns to be read with dictionary-encoding. Bodo automatically loads string columns using dictionary encoding when it determines it would be beneficial based on a heuristic. Dictionary-encoding stores data in memory in an efficient manner and is most effective when the column has many repeated values. Read more about dictionary-encoded layout here.
Bodo will raise a warning if the specified columns are not present in the schema or if they are not of type string.
For example:
```py
@bodo.jit()
def impl(query, conn):
df = pd.read_sql(query, conn, _bodo_read_as_dict=["A", "B", "C"])
return df
```
Writing to Snowflake¶
To write a dataframe to Snowflake using Bodo, users can use
df.to_sql
with their Snowflake username and password:
df.to_sql(table_name, "snowflake://<username>:<password>@url")
.
Usage¶
Bodo requires the Snowflake connection string to be passed as an
argument to the df.to_sql
function. The complete code looks as
follows:
import bodo
import pandas as pd
@bodo.jit(distributed=["df"])
def write_to_snowflake(df, table_name):
df.to_sql(
table_name,
"snowflake://user:password@url/db_name/schema?warehouse=warehouse_name&role=role_name",
)
write_to_snowflake(df, table_name)
Note
- Writing Pandas Dataframe index to Snowflake is not supported. If
index
and/orindex_label
are provided, they will be ignored. if_exists=append
is needed if you want to append to a table that already exists in Snowflake.chunksize
argument is supported and used for writing parquet files to Snowflake stage in batches. It is the maximum number of rows to write to any of the intermediate parquet files. When not provided, Bodo will estimate and use a reasonable chunk size.dtype
andmethod
arguments are not supported and will be ignored if provided.
Required Snowflake Permissions¶
Creating a new table¶
Creating a new table
To create a new table, the role being used must have the USAGE
permission
at the database level. In addition, the following permissions are
required at the Schema level:
USAGE
CREATE TABLE
CREATE STAGE
Replacing an existing table¶
Replacing an existing table
To replace an existing table (i.e. when using if_exists='replace'
), the role
must be an owner of the table, in addition to having the permissions listed in the
create section (at the Database and Schema level).
Appending to an existing table¶
Appending to an existing table
To append to an existing table (i.e. when using if_exists='append'
), the role
must have the INSERT
permission at the table level, in addition to the
permissions listed in the create section
(at the Database and Schema level).
Verifying your role's permissions in Snowflake¶
Verifying your role's permissions in Snowflake
You can check the permissions granted to your role in Snowflake
using the
SHOW GRANTS
command, e.g.:
Alternatively, you can check the permissions at the database/schema/table level and verify that your role has the required permissions, e.g.:
-- Check that your role has the USAGE permission on the database
show grants on database test_db
-- Check that your role has the required permissions on the schema
show grants on schema test_schema
-- Check that your role has the required permissions on the table
show grants on table test_schema."TEST_TABLE"
You can also use the
Snowsight UI
to check these permissions by navigating to the Privileges
section of the
Details
tab of the database/schema/table.
Advanced Configuration Options¶
Bodo provides highly performant distributed Snowflake write capability.
This is done by writing parquet files to a Snowflake stage and then
using Snowflake's COPY INTO
to load the data into the Snowflake table.
Based on the type of your Snowflake account (i.e. whether it is
an AWS or Azure or GCP based account) and your environment (i.e.
whether certain packages and modules are installed), Bodo will
use one of two strategies to write to Snowflake:
Direct Upload (preferred) or Put Upload.
-
Direct Upload: In this strategy, Bodo creates a temporary stage and writes parquet files to it directly. Once these files are written, Bodo will automatically execute the
COPY INTO
command to copy the data into Snowflake. This is supported on S3 and ADLS based Snowflake stages (used by AWS and Azure based Snowflake accounts, respectively). Note that Bodo will drop the temporary stage once the data has been written. Temporary stages are also automatically cleaned up by Snowflake after the session ends.Note
For writing to ADLS based stages, you must have Hadoop setup correctly (see more details here) and have the
bodo-azurefs-sas-token-provider
package installed (it can be installed usingconda install bodo-azurefs-sas-token-provider -c bodo.ai -c conda-forge
). Bodo will fall back to the Put Upload strategy if both these conditions are not met. Also see Interleaving HDFS/ADLS I/O with Snowflake Write.Note that this is only applicable to on-prem use cases since all of this is pre-configured on the Bodo Platform.
-
Put Upload: In this strategy, Bodo creates a 'named' stage, writes parquet files to a temporary directory locally and then uses the Snowflake Python Connector to upload these files to this named stage using the
PUT
command. Similar to the Direct Upload strategy, once the files have been transferred, Bodo will automatically execute theCOPY INTO
command to copy the data into Snowflake. This is used for GCS based stages (used by GCP based Snowflake accounts), or when the user environment doesn't have all the required packages and modules to use the Direct Upload strategy.Similar to the Direct Upload strategy, Bodo will drop the named stage after the data has been written to the table.
Note
In some cases, e.g. during abnormal exits, Bodo may not be able to drop these stages, which may require manual cleanup by the user. All stages created by Bodo are of the form "bodo_io_snowflake_{random-uuid}". You can list all stages created by Bodo in Snowflake by executing the
SHOW STAGES
command:and then delete them using the
DROP STAGE
command, e.g.:These operations are not supported in Bodo and must be executed directly through the Snowflake console.
Direct Upload is the preferred strategy and used by default whenever possible. When this is not possible, Bodo will show a warning to the user. For optimal Snowflake ingestion performance, Bodo writes multiple small intermediate parquet files on each rank, instead of a single file like it does for regular parquet writes.
Users can set the environment variable BODO_SF_WRITE_DEBUG
(to any value),
to get more details during the Snowflake write process. This includes printing
the raw result of the COPY INTO
command execution, and printing more
detailed error messages in case of exceptions.
Bodo exposes the following configuration variables for tuning the write process further (for advanced users only):
SF_WRITE_UPLOAD_USING_PUT
: This configuration variable can be set toTrue
to force Bodo to use the Put Upload strategy. This isFalse
by default since Direct Upload is the preferred strategy.SF_WRITE_PARQUET_COMPRESSION
: This configuration variable can set to specify the compression method used for writing the intermediate Parquet files. Supported values include:"snappy"
,"gzip"
and"zstd"
. Bodo uses"snappy"
by default since it provided the best overall performance in our benchmarks, however this can vary based on your data.SF_WRITE_PARQUET_CHUNK_SIZE
: This configuration variable can be used to specify the chunk size to use when writing the dataframe to the intermediate Parquet files. This is measured by the uncompressed memory usage of the dataframe (in bytes). Note that when provided,df.to_sql
'schunksize
parameter (see description in note in the Usage section) overrides this value.
These can be set as follows:
import bodo
import bodo.io.snowflake
bodo.io.snowflake.SF_WRITE_UPLOAD_USING_PUT = False
bodo.io.snowflake.SF_WRITE_PARQUET_COMPRESSION = "gzip"
...
MySQL¶
Prerequisites¶
In addition to sqlalchemy
, installing pymysql
is required.
If you are using Bodo in a conda environment:
If you have installed Bodo using pip:
Usage¶
Reading result of a SQL query in a dataframe:
import bodo
import pandas as pd
@bodo.jit(distributed=["df"])
def read_mysql(table_name, conn):
df = pd.read_sql(
f"SELECT * FROM {table_name}",
conn
)
return df
table_name = "test_table"
conn = f"mysql+pymysql://{username}:{password}@{host}/{db_name}"
df = read_mysql(table_name, conn)
Writing dataframe as a table in the database:
import bodo
import pandas as pd
@bodo.jit(distributed=["df"])
def write_mysql(df, table_name, conn):
df.to_sql(table, conn)
table_name = "test_table"
df = pd.DataFrame({"A": [1.12, 1.1] * 5, "B": [213, -7] * 5})
conn = f"mysql+pymysql://{username}:{password}@{host}/{db_name}"
write_mysql(df, table_name, conn)
Oracle Database¶
Prerequisites¶
In addition to sqlalchemy
, install cx_oracle
and Oracle instant client driver.
If you are using Bodo in a conda environment:
If you have installed Bodo using pip:
- Then, Download "Basic" or "Basic light" package matching your operating system from here.
- Unzip package and add it to
LD_LIBRARY_PATH
environment variable.
Note
For linux libaio
package is required as well.
- conda:
conda install libaio -c conda-forge
- pip:
pip install libaio
See cx_oracle for more information.
Alternatively, Oracle instant driver can be automatically downloaded using wget
or curl
commands.
Here's an example of automatic installation on a Linux OS machine.
conda install cx_oracle libaio -c conda-forge
mkdir -p /opt/oracle
cd /opt/oracle
wget https://download.oracle.com/otn_software/linux/instantclient/215000/instantclient-basic-linux.x64-21.5.0.0.0dbru.zip
unzip instantclient-basic-linux.x64-21.5.0.0.0dbru.zip
export LD_LIBRARY_PATH=/opt/oracle/instantclient_21_5:$LD_LIBRARY_PATH
Usage¶
Reading result of a SQL query in a dataframe:
import bodo
import pandas as pd
@bodo.jit(distributed=["df"])
def read_oracle(table_name, conn):
df = pd.read_sql(
f"SELECT * FROM {table_name}",
conn
)
return df
table_name = "test_table"
conn = f"oracle+cx_oracle://{username}:{password}@{host}/{db_name}"
df = read_oracle(table_name, conn)
Writing dataframe as a table in the database:
import bodo
import pandas as pd
@bodo.jit(distributed=["df"])
def write_mysql(df, table_name, conn):
df.to_sql(table, conn)
table_name = "test_table"
df = pd.DataFrame({"A": [1.12, 1.1] * 5, "B": [213, -7] * 5})
conn = f"oracle+cx_oracle://{username}:{password}@{host}/{db_name}"
write_mysql(df, table_name, conn)
PostgreSQL¶
Prerequisites¶
In addition to sqlalchemy
, install psycopg2
.
If you are using Bodo in a conda environment:
If you have installed Bodo using pip:
Usage¶
Reading result of a SQL query in a dataframe:
import bodo
import pandas as pd
@bodo.jit(distributed=["df"])
def read_postgresql(table_name, conn):
df = pd.read_sql(
f"SELECT * FROM {table_name}",
conn
)
return df
table_name = "test_table"
conn = f"postgresql+psycopg2://{username}:{password}@{host}/{db_name}"
df = read_postgresql(table_name, conn)
Writing dataframe as a table in the database:
import bodo
import pandas as pd
@bodo.jit(distributed=["df"])
def write_postgresql(df, table_name, conn):
df.to_sql(table, conn)
table_name = "test_table"
df = pd.DataFrame({"A": [1.12, 1.1] * 5, "B": [213, -7] * 5})
conn = f"postgresql+psycopg2://{username}:{password}@{host}/{db_name}"
write_postgresql(df, table_name, conn)
Specifying I/O Data Types Manually¶
In some rase use cases, the dataset path cannot be a constant value
or a JIT function argument. In such cases, the path is determined dynamically, which does not allow automatic Bodo data type inference.
Therefore, the user has to provide the data types manually.
For example, names
and dtypes
keyword arguments of pd.read_csv
and pd.read_excel
allow the user to specify the data types:
@bodo.jit
def example_csv(fname1, fname2, flag):
if flag:
file_name = fname1
else:
file_name = fname2
return pd.read_csv(file_name, names = ["A", "B", "C"], dtype={"A": int, "B": float, "C": str})
For other pandas read functions, the existing APIs do not
currently allow this information to be provided.
Users can still provide
typing information in the bodo.jit
decorator,
similar to Numba's typing syntax.
For example:
@bodo.jit(locals={"df":{"one": bodo.float64[:],
"two": bodo.string_array_type,
"three": bodo.bool_[:],
"four": bodo.float64[:],
"five": bodo.string_array_type,
}})
def example_df_schema(fname1, fname2, flag):
if flag:
file_name = fname1
else:
file_name = fname2
df = pd.read_parquet(file_name)
return df
@bodo.jit(locals={"X": bodo.float64[:,:], "Y": bodo.float64[:]})
def example_h5(fname1, fname2, flag):
if flag:
file_name = fname1
else:
file_name = fname2
f = h5py.File(file_name, "r")
X = f["points"][:]
Y = f["responses"][:]
For the complete list of supported types, please see the pandas dtype section. In the event that the dtypes are improperly specified, Bodo will throw a runtime error.
Warning
Providing data types manually is error-prone and should be avoided as much as possible.