16. Migration from Spark

This section provides information for migrating Spark workloads to Bodo, unlocking orders of magnitude performance improvement, as well as simplicity and maintainability of Python codes. Spark APIs are usually equivalent to simpler Python/Pandas APIs, which are automatically parallelized by Bodo.

16.1. pyspark.sql.DataFrame

The table below is a reference of Spark DataFrame methods and their equivalents in Python, which are supported by Bodo.

Pyspark Method

Python Equivalent

pyspark.sql.DataFrame.alias()

alias = df

pyspark.sql.DataFrame.approxQuantile()

df[['A', 'B', 'C']].quantile(q)

pyspark.sql.DataFrame.columns

df.columns

pyspark.sql.DataFrame.corr()

df[['A', 'B']].corr()

pyspark.sql.DataFrame.count()

df.count()

pyspark.sql.DataFrame.cov()

df[['A', 'B']].cov()

pyspark.sql.DataFrame.crossJoin()

df1.assign(key=1).merge(df2.assign(key=1), on="key").drop("key", axis=1)

pyspark.sql.DataFrame.describe()

df.describe()

pyspark.sql.DataFrame.distinct()

df.distinct()

pyspark.sql.DataFrame.drop()

df.drop(col, axis=1)

pyspark.sql.DataFrame.dropDuplicates()

df.drop_duplicates()

pyspark.sql.DataFrame.drop_duplicates()

df.drop_duplicates()

pyspark.sql.DataFrame.dropna()

df.dropna()

pyspark.sql.DataFrame.fillna()

df.fillna(value)

pyspark.sql.DataFrame.filter()

df[cond]

pyspark.sql.DataFrame.first()

df.head(1)

pyspark.sql.DataFrame.foreach()

df.apply(f, axis=1)

pyspark.sql.DataFrame.groupBy()

df.groupby("col")

pyspark.sql.DataFrame.groupby()

df.groupby("col")

pyspark.sql.DataFrame.head()

df.head(n)

pyspark.sql.DataFrame.intersect()

pd.merge(df1[['col1', 'col2']].drop_duplicates(), df2[['col1', 'col2']].drop_duplicates(), on =['col1', 'col2'])

pyspark.sql.DataFrame.intersectAll()

pd.merge(df1[['col1', 'col2']], df2[['col1', 'col2']].drop_duplicates(), on =['col1', 'col2'])

pyspark.sql.DataFrame.join()

df1.join(df2)

pyspark.sql.DataFrame.orderBy()

df.sort_values('colname')

pyspark.sql.DataFrame.show()

print(df.head(n))

pyspark.sql.DataFrame.sort()

df.sort_values('colname')

16.2. pyspark.sql.functions

The table below is a reference of Spark SQL functions and their equivalents in Python, which are supported by Bodo.

Pyspark Function

Python Equivalent

pyspark.sql.functions.abs()

df.col.abs()

pyspark.sql.functions.acos()

np.arccos(df.col)

pyspark.sql.functions.add_months()

df.col + pd.DateOffset(months=num_months)

pyspark.sql.functions.approx_count_distinct()

df.col.nunique()

pyspark.sql.functions.array_distinct()

df.col.map(lambda x: np.unique(x))

pyspark.sql.functions.array_max()

df.col.map(lambda x: np.nanmax(x))

pyspark.sql.functions.array_min()

df.col.map(lambda x: np.nanmin(x))

pyspark.sql.functions.array_position()

df.col.apply(lambda x, value: np.append(np.where(x == value)[0], -1)[0], value=value)
(Note, Python uses 0 indexing)

pyspark.sql.functions.array_repeat()

df.col.apply(lambda x, count: np.repeat(x, count), count=count)

pyspark.sql.functions.array_union()

df[['col1', 'col2']].apply(lambda x: np.union1d(x[0], x[1]), axis=1)

pyspark.sql.functions.asc()

df.sort_values('col')

pyspark.sql.functions.asc_nulls_first()

df.sort_values('col', na_position='first')

pyspark.sql.functions.asc_nulls_last()

df.sort_values('col')

pyspark.sql.functions.ascii()

df.col.map(lambda x: ord(x[0]))

pyspark.sql.functions.asin()

np.arcsin(df.col)

pyspark.sql.functions.atan()

np.arctan(df.col)

pyspark.sql.functions.atan2()

df[['col1', 'col2']].apply(lambda x: np.arctan2(x[0], x[1]), axis=1)

pyspark.sql.functions.avg()

df.col.mean()

pyspark.sql.functions.bin()

df.col.map(lambda x: "{0:b}".format(x))

pyspark.sql.functions.bitwiseNOT()

np.invert(df.col)

pyspark.sql.functions.bround()

df.col.apply(lambda x, scale: np.round(x, scale), scale=scale)

pyspark.sql.functions.cbrt()

df.col.map(lambda x: np.cbrt(x))

pyspark.sql.functions.ceil()

np.ceil(df.col)

pyspark.sql.functions.col()

df.col

pyspark.sql.functions.collect_list()

df.col.to_numpy()

pyspark.sql.functions.collect_set()

np.unique(df.col.to_numpy())

pyspark.sql.functions.column()

df.col

pyspark.sql.functions.concat()

# Arrays df[['col1', 'col2', 'col3']].apply(lambda x: np.hstack(x), axis=1)
# Strings df[['col1', 'col2', 'col3']].apply(lambda x: "".join(x), axis=1)

pyspark.sql.functions.concat_ws()

df[['col1', 'col2', 'col3']].apply(lambda x, sep: sep.join(x), axis=1, sep=sep)

pyspark.sql.functions.conv()

base_map = {2: "{0:b}", 8: "{0:o}", 10: "{0:d}", 16: "{0:x}"}
new_format = base_map[new_base]
df.col.apply(lambda x, old_base, new_format: new_format.format(int(x, old_base)), old_base=old_base, new_format=new_format)

pyspark.sql.functions.cos()

np.cos(df.col)

pyspark.sql.functions.cosh()

np.cosh(df.col)

pyspark.sql.functions.count()

df.col.count()

pyspark.sql.functions.countDistinct()

df.col.drop_duplicates().count()

pyspark.sql.functions.current_date()

datetime.datetime.now().date()

pyspark.sql.functions.current_timestamp()

datetime.datetime.now()

pyspark.sql.functions.date_add()

df.col + pd.tseries.offsets.DateOffset(num_days)

pyspark.sql.functions.date_format()

df.col.dt.strftime(format_str)

pyspark.sql.functions.date_sub()

df.col - pd.tseries.offsets.DateOffset(num_days)

pyspark.sql.functions.date_trunc()

For frequencies day and below df.col.dt.floor(freq=trunc_val)
For month: df.col.map(lambda x: pd.Timestamp(year=x.year, month=x.month, day=1))
For year: df.col.map(lambda x: pd.Timestamp(year=x.year, month=1, day=1))

pyspark.sql.functions.datediff()

(df.col1 - df.col2).dt.days

pyspark.sql.functions.dayofmonth()

df.col.dt.day

pyspark.sql.functions.dayofweek()

df.col.dt.dayofweek

pyspark.sql.functions.dayofyear()

df.col.dt.dayofyear

pyspark.sql.functions.degrees()

np.degrees(df.col)

pyspark.sql.functions.desc()

df.sort_values('col', ascending=False)

pyspark.sql.functions.desc_nulls_first()

df.sort_values('col', ascending=False, na_position='first')

pyspark.sql.functions.desc_nulls_last()

df.sort_values('col', ascending=False)

pyspark.sql.functions.exp()

np.exp(df.col)

pyspark.sql.functions.expm1()

np.exp(df.col) - 1

pyspark.sql.functions.factorial()

df.col.map(lambda x: math.factorial(x))

pyspark.sql.functions.floor()

np.floor(df.col)

pyspark.sql.functions.format_number()

df.col.apply(lambda x,d : ("{:,." + str(d) + "f}").format(np.round(x, d)), d=d)

pyspark.sql.functions.format_string()

df.col.apply(lambda x, format_str : format_str.format(x), format_str=format_str)

pyspark.sql.functions.from_unixtime()

df.col.map(lambda x: pd.Timestamp(x, 's')).dt.strftime(format_str)

pyspark.sql.functions.hash()

df.col.map(lambda x: hash(x))

pyspark.sql.functions.hour()

df.col.dt.hour

pyspark.sql.functions.hypot()

df[['col1', 'col2']].apply(lambda x: np.hypot(x[0], x[1]), axis=1)

pyspark.sql.functions.initcap()

df.col.str.title()

pyspark.sql.functions.instr()

df.col.str.find(sub=substr)

pyspark.sql.functions.isnan()

np.isnan(df.col)

pyspark.sql.functions.isnull()

df.col.isna()

pyspark.sql.functions.kurtosis()

df.col.kurtosis()

pyspark.sql.functions.last_day()

df.col + pd.tseries.offsets.MonthEnd()

pyspark.sql.functions.least()

df.min(axis=1)

pyspark.sql.functions.locate()

df.col.str.find(sub=substr, start=start)

pyspark.sql.functions.log()

np.log(df.col) / np.log(base)

pyspark.sql.functions.log10()

np.log10(df.col)

pyspark.sql.functions.log1p()

np.log(df.col) + 1

pyspark.sql.functions.log2()

np.log2(df.col)

pyspark.sql.functions.lower()

df.col.str.lower()

pyspark.sql.functions.lpad()

df.col.str.pad(len, flllchar=char)

pyspark.sql.functions.ltrim()

df.col.str.lstrip()

pyspark.sql.functions.max()

df.col.max()

pyspark.sql.functions.mean()

df.col.mean()

pyspark.sql.functions.min()

df.col.min()

pyspark.sql.functions.minute()

df.col.dt.minute

pyspark.sql.functions.monotonically_increasing_id()

pd.Series(np.arange(len(df)))

pyspark.sql.functions.month()

df.col.dt.month

pyspark.sql.functions.nanvl()

df[['A', 'B']].apply(lambda x: x[0] if not pd.isna(x[0]) else x[1], axis=1)

pyspark.sql.functions.overlay()

df.A.str.slice_replace(start=index, stop=index+len, repl=repl_str)

pyspark.sql.functions.pandas_udf()

df.apply(f) or df.col.map(f)

pyspark.sql.functions.pow()

np.power(df.col1, df.col2)

pyspark.sql.functions.quarter()

df.col.dt.quarter

pyspark.sql.functions.radians()

np.radians(df.col)

pyspark.sql.functions.rand()

pd.Series(np.random.rand(1, num_cols))

pyspark.sql.functions.randn()

pd.Series(np.random.randn(num_cols))

pyspark.sql.functions.regexp_extract()

def f(x, pat):
res = re.search(pat, x)
return "" if res is None else res[0]
df.col.apply(f, pat=pat)

pyspark.sql.functions.regexp_replace()

df.col.str.replace(pattern, repl_string)

pyspark.sql.functions.repeat()

df.col.str.repeat(count)

pyspark.sql.functions.reverse()

df.col.map(lambda x: x[::-1])

pyspark.sql.functions.rint()

df.col.map(lambda x: int(np.round(x, 0)))

pyspark.sql.functions.round()

df.col.apply(lambda x, decimal_places: np.round(x, decimal_places), decimal_places=decimal_places)

pyspark.sql.functions.rpad()

df.col.str.pad(len, side='right', flllchar=char)

pyspark.sql.functions.rtrim()

df.col.str.rstrip()

pyspark.sql.functions.second()

df.col.dt.second

pyspark.sql.functions.sequence()

df[['col1', 'col2', 'col3']].apply(lambda x: np.arange(x[0], x[1], x[2]), axis=1)

pyspark.sql.functions.shiftLeft()

# If the type is uint64 np.left_shift(df.col.astype(np.int64), numbits).astype(np.uint64))
# Other integer types: np.left_shift(df.col, numbits)

pyspark.sql.functions.shiftRight()

# If the type is uint64 use shiftRightUnsigned
# Other integer types: np.right_shift(df.col, numbits)

pyspark.sql.functions.shiftRightUnsigned()

def shiftRightUnsigned(col, num_bits):
bits_minus_1 = max((num_bits - 1), 0)
mask_bits = (np.int64(1) << bits_minus_1) - 1
mask = ~(mask_bits << (63 - bits_minus_1))
return np.right_shift(col.astype(np.int64), num_bits) & mask).astype(np.uint64)
shiftRightUnsigned(df.col, numbits)

pyspark.sql.functions.shuffle()

df.col.map(lambda x: np.random.permutation(x))

pyspark.sql.functions.signum()

np.sign(df.col)

pyspark.sql.functions.sin()

np.sin(df.col)

pyspark.sql.functions.sinh()

np.sinh(df.col)

pyspark.sql.functions.size()

df.col.map(lambda x: len(x))

pyspark.sql.functions.skewness()

df.col.skew()

pyspark.sql.functions.slice()

df.col.map(lambda x: x[start : end])

pyspark.sql.functions.sort_array()

Ascending: df.col.map(lambda x: np.sort(x))
Descending: df.col.map(lambda x: np.sort(x)[::-1])

pyspark.sql.functions.split()

df.col.str.split(pat, num_splits)

pyspark.sql.functions.sqrt()

np.sqrt(df.col)

pyspark.sql.functions.stddev()

df.col.std()

pyspark.sql.functions.stddev_pop()

df.col.std(ddof=0)

pyspark.sql.functions.stddev_samp()

df.col.std()

pyspark.sql.functions.substring()

df.col.str.slice(start, start+len)

pyspark.sql.functions.substring_index()

df.col.apply(lambda x, sep, count: sep.join(x.split(sep)[:count]), sep=sep, count=count)

pyspark.sql.functions.sum()

df.col.sum()

pyspark.sql.functions.sumDistinct()

df.col.drop_duplicates().sum()

pyspark.sql.functions.tan()

np.tan(df.col)

pyspark.sql.functions.tanh()

np.tanh(df.col)

pyspark.sql.functions.translate()

df.col.str.split("").apply(lambda x: "".join(pd.Series(x).replace(to_replace, values).tolist()), to_replace=to_replace, values=values)

pyspark.sql.functions.trim()

df.col.str.strip()

pyspark.sql.functions.udf()

df.apply or df.col.map

pyspark.sql.functions.upper()

df.col.str.upper()

pyspark.sql.functions.var_pop()

df.col.var(ddof=0)

pyspark.sql.functions.var_samp()

df.col.var()

pyspark.sql.functions.variance()

df.col.var()

pyspark.sql.functions.weekofyear()

df.col.dt.isocalendar().week

pyspark.sql.functions.when()

df.A.apply(lambda a, cond, val, other: val if cond(a) else other, cond=cond, val=val, other=other)

pyspark.sql.functions.year()

df.col.dt.year