6. File I/O¶
Bodo automatically parallelizes I/O of different nodes in a distributed setting without any code changes.
6.1. Supported formats¶
Currently, Bodo supports I/O for Parquet, CSV, SQL, JSON, HDF5 , and Numpy binaries formats. Also see Supported Pandas Operations for supported arguments.
6.1.1. Parquet¶
For Parquet, the syntax is the same as Pandas:
pd.read_parquet(path)
, where path can be a parquet file or a directory with multiple parquet files
(all are part of the same dataframe):
@bodo.jit
def write_pq(df):
df.to_parquet('example.pq')
@bodo.jit
def read_pq():
df = pd.read_parquet('example.pq')
return df
to_parquet(name)
with distributed data writes to a folder called name
.
Each process writes one file into the folder, but if the data is not distributed,
to_parquet(name)
writes to a single file called name
:
df = pd.DataFrame({'A': np.arange(n)})
@bodo.jit
def example1_pq(df):
df.to_parquet('example1.pq')
@bodo.jit(distributed={'df'})
def example2_pq(df):
df.to_parquet('example2.pq')
if bodo.get_rank() == 0:
example1_pq(df)
example2_pq(df)
Run the code above with 4 processors:
$ mpiexec -n 4 python example_pq.py
example1_pq(df)
writes 1 single file, and example2_pq(df)
writes a folder containing 4 parquet files:
.
├── example1.pq
├── example2.pq
│ ├── part-00.parquet
│ ├── part-01.parquet
│ ├── part-02.parquet
│ └── part-03.parquet
See read_parquet(), to_parquet() for supported arguments.
6.1.2. CSV¶
For CSV, the syntax is also the same as Pandas:
@bodo.jit
def write_csv(df):
df.to_csv('example.csv')
@bodo.jit
def read_csv():
df = pd.read_csv('example.csv')
return df
Unlike Pandas’ read_csv
, Bodo can read a directory that contains multiple partitioned CSV files as well.
Note
Bodo uses nullable integer types of Pandas to ensure type stability (see Integer NA issue in Pandas for more details). Therefore, data types must be specified explicitly for accurate performance comparisons of Bodo and Pandas for read_csv
.
to_csv(name)
has different behaviors for different file systems:
1. POSIX file systems: always writes to a single file, regardless of the number of processes and whether the data is distributed, but writing is still done in parallel when more than 1 processor is used:
df = pd.DataFrame({'A': np.arange(n)}) @bodo.jit def example1_csv(df): df.to_csv('example1.csv') @bodo.jit(distributed={'df'}) def example2_csv(df): df.to_csv('example2.csv') if bodo.get_rank() == 0: example1_csv(df) example2_csv(df)Run the code above with 4 processors:
$ mpiexec -n 4 python example_csv.pyeach
example1_csv(df)
andexample2_csv(df)
writes to a single file:. ├── example1.csv ├── example2.csv2. S3 and HDFS: distributed data is written to a folder called
name
. Each process writes one file into the folder, but if the data is not distributed,to_csv(name)
writes to a single file calledname
:df = pd.DataFrame({'A': np.arange(n)}) @bodo.jit def example1_csv(df): df.to_csv('s3://bucket-name/example1.csv') @bodo.jit(distributed={'df'}) def example2_csv(df): df.to_csv('s3://bucket-name/example2.csv') if bodo.get_rank() == 0: example1_csv(df) example2_csv(df)Run the code above with 4 processors:
$ mpiexec -n 4 python example_csv.py
example1_csv(df)
writes 1 single file, andexample2_csv(df)
writes a folder containing 4 csv files:. ├── example1.csv ├── example2.csv │ ├── part-00.csv │ ├── part-01.csv │ ├── part-02.csv │ └── part-03.csv
See read_csv(), to_csv() for supported arguments.
6.1.3. JSON¶
For JSON, the syntax is also the same as Pandas:
@bodo.jit
def example_write_json(df, fname):
df.to_json(fname)
@bodo.jit
def example_read_json_lines_format():
df = pd.read_json('example.json', orient = 'records', lines = True)
@bodo.jit
def example_read_json_multi_lines():
# dtype argument required when reading a regular multi-line JSON file
# cannot read a directory containing multiple multi-line JSON files
df = pd.read_json('example_file.json', orient = 'records', lines = False,
dtype={"A": np.float, "B": "bool", "C": np.int})
to_json(name)
has different behaviors for different file systems:
- POSIX file systems:
to_json(name)
behavior depends onorient
andlines
arguments.(1)
DataFrame.to_json(name, orient='records', lines=True)
(i.e. writing JSON Lines text file format) always writes to a single file, regardless of the number of processes and whether the data is distributed, but writing is still done in parallel when more than 1 processor is used:df = pd.DataFrame({'A': np.arange(n)}) @bodo.jit def example1_json(df): df.to_json('example1.json', orient='records', lines=True) @bodo.jit(distributed={'df'}) def example2_json(df): df.to_json('example2.json', orient='records', lines=True) if bodo.get_rank() == 0: example1_json(df) example2_jsons(df)Run the code above with 4 processors:
$ mpiexec -n 4 python example_json.pyeach
example1_json(df)
andexample2_json(df)
writes to a single file:. ├── example1.json ├── example2.json2. S3 and HDFS: distributed data is written to a folder called
name
. Each process writes one file into the folder, but if the data is not distributed,to_json(name)
writes to a file calledname
:df = pd.DataFrame({'A': np.arange(n)}) @bodo.jit def example1_json(df): df.to_json('s3://bucket-name/example1.json') @bodo.jit(distributed={'df'}) def example2_json(df): df.to_json('s3://bucket-name/example2.json') if bodo.get_rank() == 0: example1_json(df) example2_json(df)Run the code above with 4 processors:
$ mpiexec -n 4 python example_json.py
example1_json(df)
writes 1 single file, andexample2_json(df)
writes a folder containing 4 json files:. ├── example1.json ├── example2.json │ ├── part-00.json │ ├── part-01.json │ ├── part-02.json │ └── part-03.json
See read_json(), to_json() for supported arguments.
6.1.4. SQL¶
For SQL, the syntax is also the same as Pandas. For reading:
@bodo.jit
def example_read_sql():
df = pd.read_sql('select * from employees', 'mysql+pymysql://admin:server')
See read_sql() for supported arguments.
For writing:
@bodo.jit
def example_write_sql(df):
df.to_sql('table_name', 'mysql+pymysql://admin:server')
See to_sql() for supported arguments.
6.1.5. Numpy binaries¶
Numpy’s fromfile
and tofile
are supported as below:
@bodo.jit
def example_np_io():
A = np.fromfile("myfile.dat", np.float64)
...
A.tofile("newfile.dat")
Bodo has the same behavior as Numpy for numpy.ndarray.tofile()
, where we always write to a single file.
However, writing distributed data to POSIX is done in parallel,
but writing to S3 & HDFS is done sequentially (due to file system limitations).
6.2. Input array types¶
Bodo needs to know the types of input arrays. If the file name is a constant string or function argument, Bodo tries to look at the file at compile time and recognize the types. Otherwise, the user is responsible for providing the types similar to Numba’s typing syntax. For example:
@bodo.jit(locals={'df':{'one': bodo.float64[:],
'two': bodo.string_array_type,
'three': bodo.bool_[:],
'four': bodo.float64[:],
'five': bodo.string_array_type,
}})
def example_df_schema(fname1, fname2, flag):
if flag:
file_name = fname1
else:
file_name = fname2
df = pd.read_parquet(file_name)
@bodo.jit(locals={'X': bodo.float64[:,:], 'Y': bodo.float64[:]})
def example_h5(fname1, fname2, flag):
if flag:
file_name = fname1
else:
file_name = fname2
f = h5py.File(file_name, "r")
X = f['points'][:]
Y = f['responses'][:]
6.3. File Systems¶
6.3.1. Amazon S3¶
Reading and writing CSV, Parquet, JSON, and Numpy binary files from and to Amazon S3 is supported.
The s3fs
package must be available, and the file path should start with s3://
:
@bodo.jit
def example_s3_parquet():
df = pd.read_parquet('s3://bucket-name/file_name.parquet')
- These environment variables are used for File I/O with S3 credentials:
AWS_ACCESS_KEY_ID
AWS_SECRET_ACCESS_KEY
AWS_DEFAULT_REGION
: default asus-east-1
AWS_S3_ENDPOINT
: specify custom host name, default as AWS endpoint(s3.amazonaws.com
)
Bodo uses Apache Arrow internally for read and write of data on S3.
6.3.2. Hadoop Distributed File System (HDFS) and Azure Data Lake Storage (ADLS) Gen2¶
Reading and writing CSV, Parquet, JSON, and Numpy binary files from and to Hadoop Distributed File System (HDFS) is supported. Note that Azure Data Lake Storage Gen2 can be accessed through HDFS.
The file path should start with hdfs://
or abfs[s]://
:
@bodo.jit
def example_hdfs_parquet():
df = pd.read_parquet('hdfs://host:port/dir/file_name.pq')
- These environment variables are used for File I/O with HDFS:
HADOOP_HOME
: the root of your installed Hadoop distribution. Often has lib/native/libhdfs.so.ARROW_LIBHDFS_DIR
: location of libhdfs. Often is$HADOOP_HOME/lib/native
.CLASSPATH
: must contain the Hadoop jars. You can set these using:export CLASSPATH=`$HADOOP_HOME/bin/hdfs classpath --glob`
Bodo uses Apache Arrow internally for read and write of data on HDFS. $HADOOP_HOME/etc/hadoop/hdfs-site.xml
provides default behaviors for the HDFS client used by Bodo. Inconsistent configurations (e.g. dfs.replication
) could potentially cause errors in Bodo programs.