Scalable Data I/O¶
Efficient parallel data processing requires data I/O to be parallelized effectively as well. Bodo provides parallel file I/O for many different formats such as Parquet, CSV, JSON, Numpy binaries, HDF5 and SQL databases. This diagram demonstrates how chunks of data are partitioned among parallel execution engines by Bodo.
Bodo automatically parallelizes I/O for any number of cores and cluster size without any additional API layers.
I/O Workflow¶
Make sure I/O calls for large data are inside JIT functions to allow Bodo to parallelize I/O and divide the data across cores automatically (see below for supported formats and APIs).
Warning
Performing I/O in regular Python (outside JIT functions) replicates data on all Python processes, which can result in out-of-memory errors if the data is large. For example, a 1 GB dataframe replicated on 1000 cores consumes 1 TB of memory.
Bodo looks at the schema of the input dataset during compilation time to infer
the datatype of the resulting dataframe.
This requires the dataset path to be available to the compiler. The path should be either a constant string value, an argument
to the JIT function, or a simple combination of the two.
For example, the following code passes the dataset path as an argument, allowing Bodo to infer the data type of df
:
import os
import pandas as pd
import bodo
data_path = os.environ["JOB_DATA_PATH"]
@bodo.jit
def f(path):
df = pd.read_parquet(path)
print(df.A.sum())
f(data_path)
Concatenating arguments and constant values also works:
import os
import pandas as pd
import bodo
data_root = os.environ["JOB_DATA_ROOT"]
@bodo.jit
def f(root):
df = pd.read_parquet(root + "/table1.pq")
print(df.A.sum())
f(data_root)
In the rare case that the path should be a dynamic value inside JIT functions, the data types have to be specified manually (see Specifying I/O Data Types Manually). This is error-prone and should be avoided as much as possible.
Supported Data Formats¶
Currently, Bodo supports I/O for Parquet, CSV, SQL, JSON, HDF5, and Numpy binaries formats. It can read these formats from multiple filesystems, including S3, HDFS and Azure Data Lake (ADLS) (see File Systems below for more information). Many databases and data warehouses such as Snowflake are supported as well.
Also see supported pandas APIs for supported arguments of I/O functions.
Parquet¶
Parquet is a commonly used file format in analytics due to its efficient
columnar storage. Bodo supports the standard pandas API for reading
Parquet: pd.read_parquet(path)
, where path can be a parquet file, a
directory with multiple parquet files (all are part of the same
dataframe), a glob pattern, list of files or list of glob patterns:
import pandas as pd
import bodo
@bodo.jit
def write_pq(df):
df.to_parquet("s3://bucket-name/example.pq")
@bodo.jit
def read_pq():
df = pd.read_parquet("s3://bucket-name/example.pq")
return df
Note
Bodo reads datasets in parallel using multiple cores while ensuring that the number of rows read on all cores is roughly equal. The size and number of row groups can affect parallel read performance significantly. Currently, reading any number of rows in Bodo requires reading at least one row-group. To read even a single row from a parquet dataset, the entire row-group containing that row (and its corresponding metadata) needs to be read first, and then the required row is extracted from it. Therefore, for best parallel read performance, there must be sufficient row-groups to minimize the number of instances where multiple cores need to read from the same row group. This means there must be at least as many row groups as the number of cores, but ideally a lot more. At the same time, the size of the row-groups should not be too small since this can lead to overheads. For more details about parquet file format, refer to the format specification.
to_parquet(name)
with distributed data writes to a folder called
name
. Each process writes one file into the folder, but if the data is
not distributed, to_parquet(name)
writes to a single file called
name
:
df = pd.DataFrame({"A": range(10)})
@bodo.jit
def example1_pq(df):
df.to_parquet("example1.pq")
@bodo.jit(distributed={"df"})
def example2_pq(df):
df.to_parquet("example2.pq")
if bodo.get_rank() == 0:
example1_pq(df)
example2_pq(df)
Run the code above with 4 processors:
example1_pq(df)
writes 1 single file, and example2_pq(df)
writes a
folder containing 4 parquet files:
.
├── example1.pq
├── example2.pq
│  ├── part-00.parquet
│  ├── part-01.parquet
│  ├── part-02.parquet
│  └── part-03.parquet
See read_parquet()
, to_parquet()
for supported arguments.
Filter Pushdown and Column Pruning¶
Filter Pushdown and Column Pruning
Bodo can detect filters used by the code and optimize the read_parquet
call by pushing the filters down to the storage layer, so that only the
rows required by the program are read.
In addition, Bodo only reads the columns that are used in the program,
and prunes the unused columns.
These optimizations can significantly speed up
I/O in many cases and can substantially reduce the program's memory footprint.
For example, suppose we have a large dataset with many columns that spans many years, and we only need to read revenue data for a particular year:
@bodo.jit
def query():
df = pd.read_parquet("s3://my-bucket/data.pq")
df = df[df["year"] == 2021]
return df.groupby("customer_key")["revenue"].max()
When compiling the above, Bodo detects the df[df["year"] == 2021]
filter and optimizes the read_parquet
call so that it only reads data
for year 2021 from S3.
This requires the dataframe filtering operation to be in the same JIT function
as read_parquet
, and the dataframe variable shouldn't be used before filtering.
Bodo also makes sure only customer_key
and revenue
columns are read since
other columns are not used in the programs.
In general, if the dataset is hive-partitioned and partition columns appear in
filter expressions, only the files that contain relevant data are read,
and the rest are discarded based on their path. For example, if year
is a partition column above and we have a dataset:
.
└── data.pq/
│ ...
├───year=2020/
│ ├── part-00.parquet
│ └── part-01.parquet
└───year=2021/
├── part-02.parquet
└── part-03.parquet
Bodo will only read the files in the year=2021
directory.
For non-partition columns, Bodo may discard files entirely just by looking at their parquet metadata (depending on the filters and statistics contained in the metadata) or filter the rows during read.
Note
Filter pushdown is often a very important optimization and critical for having manageable memory footprint in big data workloads. Make sure filtering happens in the same JIT function right after dataset read (or JIT functions for I/O are inlined, see inlining).
Exploring Large Data Without Full Read¶
Exploring Large Data Without Full Read
Exploring large datasets often requires seeing its shape and a sample of the data. Bodo is able to provide this information quickly without loading the full Parquet dataset, which means there is no need for a large cluster with a lot of memory. For example:
@bodo.jit
def head_only_read():
df = pd.read_parquet("s3://my-bucket/example.pq")
print(df.shape)
print(df.head())
In this example, Bodo provides the shape information for the full
dataset in df.shape
, but only loads the first few rows that are
necessary for df.head()
.
CSV¶
CSV is a common text format for data exchange. Bodo supports most of the standard pandas API to read CSV files:
import pandas as pd
import bodo
@bodo.jit
def write_csv(df):
df.to_csv("s3://my-bucket/example.csv")
@bodo.jit
def read_csv():
df = pd.read_csv("s3://my-bucket/example.csv")
return df
Unlike read_csv
in regular pandas, Bodo can read a directory that
contains multiple partitioned CSV files as well. All files in the folder
must have the same number and datatype of columns. They can have
different number of rows.
Usage:
@bodo.jit
def read_csv_folder():
df = pd.read_csv("s3://my-bucket/path/to/folder/foldername")
doSomething(df)
Use sep="n"
to read text files line by line into a single-column
dataframe (without creating separate columns, useful when text data is
unstructured or there are too many columns to read efficiently):
@bodo.jit
def read_test():
df = pd.read_csv("example.csv", sep="n", names=["value"], dtype={"value": "str"})
return df
Note
Bodo uses nullable integer types of pandas to ensure type stability (see
integer NA issue in pandas for more details). Therefore, data types must be specified
explicitly for accurate performance comparisons of Bodo and pandas for
read_csv
.
to_csv(name)
has different behaviors for different file systems:
-
POSIX file systems: always writes to a single file, regardless of the number of processes and whether the data is distributed, but writing is still done in parallel when more than 1 processor is used:
df = pd.DataFrame({"A": np.arange(n)}) @bodo.jit def example1_csv(df): df.to_csv("example1.csv") @bodo.jit(distributed={"df"}) def example2_csv(df): df.to_csv("example2.csv") if bodo.get_rank() == 0: example1_csv(df) example2_csv(df)
Run the code above with 4 processors:
each
example1_csv(df)
andexample2_csv(df)
writes to a single file: -
S3 and HDFS: distributed data is written to a folder called
name
. Each process writes one file into the folder, but if the data is not distributed,to_csv(name)
writes to a single file calledname
:df = pd.DataFrame({"A": np.arange(n)}) @bodo.jit def example1_csv(df): df.to_csv("s3://bucket-name/example1.csv") @bodo.jit(distributed={"df"}) def example2_csv(df): df.to_csv("s3://bucket-name/example2.csv") if bodo.get_rank() == 0: example1_csv(df) example2_csv(df)
Run the code above with 4 processors:
example1_csv(df)
writes 1 single file, andexample2_csv(df)
writes a folder containing 4 csv files:
See read_csv()
, to_csv()
for supported arguments.
JSON¶
For JSON, the syntax is also the same as pandas.
Usage:
@bodo.jit
def example_write_json(df, fname):
df.to_json(fname)
@bodo.jit
def example_read_json_lines_format():
df = pd.read_json("example.json", orient = "records", lines = True)
@bodo.jit
def example_read_json_multi_lines():
df = pd.read_json("example_file.json", orient = "records", lines = False,
dtype={"A": float, "B": "bool", "C": int})
Note
- The
dtype
argument is required when reading a regular multi-line JSON file. - Bodo cannot read a directory containing multiple multi-line JSON files
- Bodo's default values for
orient
andlines
arerecords
andFalse
respectively.
to_json(name)
has different behaviors for different file systems:
-
POSIX file systems:
to_json(name)
behavior depends onorient
andlines
arguments.-
DataFrame.to_json(name, orient="records", lines=True)
(i.e. writing JSON Lines text file format) always writes to a single file, regardless of the number of processes and whether the data is distributed, but writing is still done in parallel when more than 1 processor is used:df = pd.DataFrame({"A": np.arange(n)}) @bodo.jit def example1_json(df): df.to_json("example1.json", orient="records", lines=True) @bodo.jit(distributed={"df"}) def example2_json(df): df.to_json("example2.json", orient="records", lines=True) if bodo.get_rank() == 0: example1_json(df) example2_jsons(df)
Run the code above with 4 processors:
each
example1_json(df)
andexample2_json(df)
writes to a single file: -
All other combinations of values for
orient
andlines
have the same behavior as S3 and HDFS explained below.
-
-
S3 and HDFS : distributed data is written to a folder called
name
. Each process writes one file into the folder, but if the data is not distributed,to_json(name)
writes to a file calledname
:df = pd.DataFrame({"A": np.arange(n)}) @bodo.jit def example1_json(df): df.to_json("s3://bucket-name/example1.json") @bodo.jit(distributed={"df"}) def example2_json(df): df.to_json("s3://bucket-name/example2.json") if bodo.get_rank() == 0: example1_json(df) example2_json(df)
Run the code above with 4 processors:
example1_json(df)
writes 1 single file, andexample2_json(df)
writes a folder containing 4 json files:. ├── example1.json ├── example2.json │  ├── part-00.json │  ├── part-01.json │  ├── part-02.json │  └── part-03.json
See
read_json()][pandas-f-in], [
to_json()` for supported arguments.
SQL¶
See Databases for the list of supported Relational Database Management Systems (RDBMS) with Bodo.
For SQL, the syntax is also the same as pandas. For reading:
@bodo.jit
def example_read_sql():
df = pd.read_sql("select * from employees", "mysql+pymysql://<username>:<password>@<host>/<db_name>")
See read_sql()
for supported arguments.
For writing:
@bodo.jit
def example_write_sql(df):
df.to_sql("table_name", "mysql+pymysql://<username>:<password>@<host>/<db_name>")
See to_sql()
for supported arguments.
Note
sqlalchemy
must be installed in order to use pandas.read_sql
.
Filter Pushdown and Column Pruning¶
Filter Pushdown and Column Pruning
Similar to Parquet read, Bodo JIT compiler is able to push down filters to the data source and prune unused columns automatically. For example, this program reads data from a very large Snowflake table, but only needs limited rows and columns:
@bodo.jit
def filter_ex(conn, int_val):
df = pd.read_sql("SELECT * FROM LINEITEM", conn)
df = df[(df["l_orderkey"] > 10) & (int_val >= df["l_linenumber"])]
result = df["l_suppkey"]
print(result)
filter_ex(conn, 2)
Bodo optimizes the query passed to read_sql
to push filters down and
prune unused columns. In this case, Bodo will replace SELECT * FROM LINEITEM
with
the optimized version automatically:
SELECT "L_SUPPKEY" FROM (SELECT * FROM LINEITEM) as TEMP
WHERE ( ( l_orderkey > 10 ) AND ( l_linenumber <= 2 ) )
Delta Lake¶
Reading parquet files from Delta Lake is supported locally, from S3, and from Azure ADLS.
- The Delta Lake binding python packaged needs to be installed using pip:
pip install deltalake
. - For S3, the
AWS_DEFAULT_REGION
environment variable should be set to the region of the bucket hosting the Delta Lake table. - For ADLS, the
AZURE_STORAGE_ACCOUNT
andAZURE_STORAGE_KEY
environment variables need to be set.
Example code for reading:
Note
Writing is currently not supported.
Iceberg¶
Bodo's support for Iceberg Tables is under active development and currently only supports
basic read functionality.
Bodo supports reading Iceberg tables stored in a directory on HDFS, either locally or from S3,
through Pandas' read_sql_table
API.
- Bodo's Iceberg Connector python package needs to be installed using conda:
conda install bodo-iceberg-connector -c bodo.ai -c conda-forge
. - For tables on S3, the credentials should be set either using environment variables,
or AWS configuration in
~/.aws
or using an instance profile on the EC2 instance.
Example code for reading:
@bodo.jit
def example_read_iceberg():
df = pd.read_sql_table(
table_name="<NAME OF ICEBERG TABLE",
con="<ABSOLUTE PATH TO ICEBERG WAREHOUSE>",
schema="<NAME OF ICEBERG DATABASE SCHEMA>"
)
Note
-
schema
argument is required for reading Iceberg tables. -
The Iceberg table to read should be located at
<con>/<schema>/<table_name>
, wherecon
,schema
andtable_name
are the arguments topd.read_sql_table
.
Warning
- Tables with delete files or those that have gone through schema evolution are not supported yet.
- Writing Iceberg Tables is not supported yet.
Numpy binaries¶
Numpy's fromfile
and tofile
are supported as below:
@bodo.jit
def example_np_io():
A = np.fromfile("myfile.dat", np.float64)
...
A.tofile("newfile.dat")
Bodo has the same behavior as Numpy for numpy.ndarray.tofile()
, where
we always write to a single file. However, writing distributed data to
POSIX is done in parallel, but writing to S3 & HDFS is done sequentially
(due to file system limitations).
HDF5¶
HDF5 is a common format in scientific computing, especially for multi-dimensional numerical data. HDF5 can be very efficient at scale, since it has native parallel I/O support. For HDF5, the syntax is the same as the h5py package. For example:
@bodo.jit
def example_h5():
f = h5py.File("data.hdf5", "r")
X = f["points"][:]
Y = f["responses"][:]
File Systems¶
Amazon S3¶
Reading and writing CSV, Parquet, JSON, and Numpy binary files from and to Amazon S3 is supported.
The fsspec
package must be available, and the file path should start
with s3://
:
These environment variables are used for File I/O with S3 credentials:
AWS_ACCESS_KEY_ID
AWS_SECRET_ACCESS_KEY
AWS_DEFAULT_REGION
: default asus-east-1
AWS_S3_ENDPOINT
: specify custom host name, default as AWS endpoint(s3.amazonaws.com
)
Connecting to S3 endpoints through a proxy is supported. The proxy URI can be provided by setting one of the following environment variables (listed in order of precedence):
http_proxy
https_proxy
HTTP_PROXY
HTTPS_PROXY
Bodo uses Apache Arrow internally for read and write of data on S3.
Google Cloud Storage¶
Reading and writing Parquet files from and to Google Cloud is supported.
The file path should start with gs://
or gcs://
:
GOOGLE_APPLICATION_CREDENTIALS
Details for GOOGLE_APPLICATION_CREDENTIALS
can be seen in the Google
docs here.
Bodo uses the fsspec-based gcsfs library internally for read and write of data on GCS.
Hadoop Distributed File System (HDFS) and Azure Data Lake Storage (ADLS) Gen2¶
Reading and writing CSV, Parquet, JSON, and Numpy binary files from and to Hadoop Distributed File System (HDFS) is supported. Note that Azure Data Lake Storage Gen2 can be accessed through HDFS.
The openjdk
version 8 package must be available, and the file path
should start with hdfs://
or abfs[s]://
:
These environment variables are used for File I/O with HDFS:
HADOOP_HOME
: the root of your installed Hadoop distribution. Often islib/native/libhdfs.so
.ARROW_LIBHDFS_DIR
: location of libhdfs. Often is$HADOOP_HOME/lib/native
.CLASSPATH
: must contain the Hadoop jars. You can set these using:
Bodo uses Apache Arrow internally for read
and write of data on HDFS. $HADOOP_HOME/etc/hadoop/hdfs-site.xml
provides default behaviors for the HDFS client used by Bodo.
Inconsistent configurations (e.g. dfs.replication
) could potentially
cause errors in Bodo programs.
Databases¶
Currently, Bodo supports most RDBMS that work with SQLAlchemy, with a corresponding driver.
Snowflake¶
Reading from Snowflake¶
To read a dataframe from a Snowflake database, users can use
pd.read_sql
with their Snowflake username and password:
pd.read_sql(query, "snowflake://<username>:<password>@url")
.
Prerequisites¶
In order to be able to query Snowflake from Bodo, installing the Snowflake connector is necessary (it is installed by default in Bodo Platform). If you are using Bodo in a conda environment:
If you have installed Bodo using pip, then you can install the Snowflake connector using pip as well:
Usage¶
Bodo requires the Snowflake connection string to be passed as an
argument to the pd.read_sql
function. The complete code looks as
follows:
import bodo
import pandas as pd
@bodo.jit
def read_snowflake(db_name, table_name):
df = pd.read_sql(
f"SELECT * FROM {table_name}",
f"snowflake://user:password@url/{db_name}/schema?warehouse=warehouse_name",
)
return df
df = read_snowflake(db_name, temp_table_name)
Writing to Snowflake with COPY INTO¶
For the best performance when writing to Snowflake, we recommend first
writing to cloud storage using DataFrame.to_parquet
and then using a
COPY INTO
query to move the data from your cloud storage provider to
your Snowflake table.
Our example uses Amazon S3, but the same approach works for ADLS, although you should follow the relevant Azure Snowflake documentation.
Usage¶
To upload your data to Snowflake, you first need to give Snowflake access to an S3 storage bucket. Then you can write to this bucket using Bodo. Finally, you need to issue a Snowflake query to transfer the data to Snowflake.
To get started you will need an AWS bucket dedicated to the output of your
Bodo write. Provide Snowflake access to this by executing steps 1-6 in
this document.
When executing step 4, you must make sure you are using the ACCOUNTADMIN
role within Snowflake to properly provide access. In step 6, when creating
the external stage, you must ensure you have the correct format for
parquet. Here is an example worksheet format:
use schema mydb.public;
create stage <stage name>
storage_integration = <integration name>
url = 's3://<bucket>/<path>/'
file_format = (type=parquet compression=auto);
In addition, you also need to confirm that you have created the destination
ensure is used a few times.
table. If you do not already have a table, you should create one with the
create table
command:
Here is an example table:
Now write your data to the bucket:
import bodo
import pandas as pd
@bodo.jit
def write_data(df)
df.to_parquet(
"s3://bucket-name/example.pq",
index=False
)
Now you can transfer the data from S3 to Snowflake:
We'll explain the struct syntax used by the parquet names use a struct syntax through an actual example, in which we write into bodo_example from above with parquet.
copy into copy_example ("A", "B")
from (select $1:"A", $1:"B" from @copy_example_stage)
file_format = (type=parquet compression=auto);
Each column must be specified with $1:"COLUMN_NAME"
. These names must
exactly match the values in the parquet file and are case sensitive.
Note
If you are executing a regularly running job, you may want to consider
Snowpipe.
Snowpipe applies the same copy into
process, but it executes every
time new files are added to the storage bucket, making it ideal for
automating the data transfer.
Writing to Snowflake with DataFrame.to_sql¶
You can also write to snowflake using DataFrame.to_sql
, which writes using
the Snowflake Python connector.
Warning
Writing to Snowflake using Dataframe.to_sql
is orders of magnitude
slower than using COPY INTO
.
Prerequisites¶
Install Snowflake Python connector to be able to write to Snowflake from Bodo (installed by default in Bodo Platform). Using Conda:
If you have installed Bodo using pip, then you can install the packages using pip as well:
Usage¶
Make sure the Snowflake connection string is passed to DataFrame.to_sql
method. For example:
import bodo
import pandas as pd
@bodo.jit
def write_snowflake(df, table_name, conn_str, schema)
df.to_sql(
table_name,
conn_str,
schema=schema,
if_exists="append",
index=False
)
write_snowflake(df, table_name, f"snowflake://{username}:{password}@url/{db_name}/public?warehouse=XL_WH", schema)
Note
index=False
is required as Snowflake does not support Indexes.if_exists=append
is needed if the table already exists in Snowflake.schema
is recommended to avoid object permission issues.
MySQL¶
Prerequisites¶
In addition to sqlalchemy
, installing pymysql
is required.
If you are using Bodo in a conda environment:
If you have installed Bodo using pip:
Usage¶
Reading result of a SQL query in a dataframe:
import bodo
import pandas as pd
@bodo.jit(distributed=["df"])
def read_mysql(table_name, conn):
df = pd.read_sql(
f"SELECT * FROM {table_name}",
conn
)
return df
table_name = "test_table"
conn = f"mysql+pymysql://{username}:{password}@{host}/{db_name}"
df = read_mysql(table_name, conn)
Writing dataframe as a table in the database:
import bodo
import pandas as pd
@bodo.jit(distributed=["df"])
def write_mysql(df, table_name, conn):
df.to_sql(table, conn)
table_name = "test_table"
df = pd.DataFrame({"A": [1.12, 1.1] * 5, "B": [213, -7] * 5})
conn = f"mysql+pymysql://{username}:{password}@{host}/{db_name}"
write_mysql(df, table_name, conn)
Oracle Database¶
Prerequisites¶
In addition to sqlalchemy
, install cx_oracle
and Oracle instant client driver.
If you are using Bodo in a conda environment:
If you have installed Bodo using pip:
- Then, Download "Basic" or "Basic light" package matching your operating system from here.
- Unzip package and add it to
LD_LIBRARY_PATH
environment variable.
Note
For linux libaio
package is required as well.
- conda:
conda install libaio -c conda-forge
- pip:
pip install libaio
See cx_oracle for more information.
Alternatively, Oracle instant driver can be automatically downloaded using wget
or curl
commands.
Here's an example of automatic installation on a Linux OS machine.
conda install cx_oracle libaio -c conda-forge
mkdir -p /opt/oracle
cd /opt/oracle
wget https://download.oracle.com/otn_software/linux/instantclient/215000/instantclient-basic-linux.x64-21.5.0.0.0dbru.zip
unzip instantclient-basic-linux.x64-21.5.0.0.0dbru.zip
export LD_LIBRARY_PATH=/opt/oracle/instantclient_21_5:$LD_LIBRARY_PATH
Usage¶
Reading result of a SQL query in a dataframe:
import bodo
import pandas as pd
@bodo.jit(distributed=["df"])
def read_oracle(table_name, conn):
df = pd.read_sql(
f"SELECT * FROM {table_name}",
conn
)
return df
table_name = "test_table"
conn = f"oracle+cx_oracle://{username}:{password}@{host}/{db_name}"
df = read_oracle(table_name, conn)
Writing dataframe as a table in the database:
import bodo
import pandas as pd
@bodo.jit(distributed=["df"])
def write_mysql(df, table_name, conn):
df.to_sql(table, conn)
table_name = "test_table"
df = pd.DataFrame({"A": [1.12, 1.1] * 5, "B": [213, -7] * 5})
conn = f"oracle+cx_oracle://{username}:{password}@{host}/{db_name}"
write_mysql(df, table_name, conn)
PostgreSQL¶
Prerequisites¶
In addition to sqlalchemy
, install psycopg2
.
If you are using Bodo in a conda environment:
If you have installed Bodo using pip:
Usage¶
Reading result of a SQL query in a dataframe:
import bodo
import pandas as pd
@bodo.jit(distributed=["df"])
def read_postgresql(table_name, conn):
df = pd.read_sql(
f"SELECT * FROM {table_name}",
conn
)
return df
table_name = "test_table"
conn = f"postgresql+psycopg2://{username}:{password}@{host}/{db_name}"
df = read_postgresql(table_name, conn)
Writing dataframe as a table in the database:
import bodo
import pandas as pd
@bodo.jit(distributed=["df"])
def write_postgresql(df, table_name, conn):
df.to_sql(table, conn)
table_name = "test_table"
df = pd.DataFrame({"A": [1.12, 1.1] * 5, "B": [213, -7] * 5})
conn = f"postgresql+psycopg2://{username}:{password}@{host}/{db_name}"
write_postgresql(df, table_name, conn)
Specifying I/O Data Types Manually¶
In some rase use cases, the dataset path cannot be a constant value
or a JIT function argument. In such cases, the path is determined dynamically, which does not allow automatic Bodo data type inference.
Therefore, the user has to provide the data types manually.
For example, names
and dtypes
keyword arguments of pd.read_csv
and pd.read_excel
allow the user to specify the data types:
@bodo.jit
def example_csv(fname1, fname2, flag):
if flag:
file_name = fname1
else:
file_name = fname2
return pd.read_csv(file_name, names = ["A", "B", "C"], dtype={"A": int, "B": float, "C": str})
For other pandas read functions, the existing APIs do not
currently allow this information to be provided.
Users can still provide
typing information in the bodo.jit
decorator,
similar to Numba's typing syntax.
For example:
@bodo.jit(locals={"df":{"one": bodo.float64[:],
"two": bodo.string_array_type,
"three": bodo.bool_[:],
"four": bodo.float64[:],
"five": bodo.string_array_type,
}})
def example_df_schema(fname1, fname2, flag):
if flag:
file_name = fname1
else:
file_name = fname2
df = pd.read_parquet(file_name)
return df
@bodo.jit(locals={"X": bodo.float64[:,:], "Y": bodo.float64[:]})
def example_h5(fname1, fname2, flag):
if flag:
file_name = fname1
else:
file_name = fname2
f = h5py.File(file_name, "r")
X = f["points"][:]
Y = f["responses"][:]
For the complete list of supported types, please see the pandas dtype section. In the event that the dtypes are improperly specified, Bodo will throw a runtime error.
Warning
Providing data types manually is error-prone and should be avoided as much as possible.