dataframe operator
When to use the dataframe
operator
The dataframe
operator allows you to run Python transformations in Airflow. Behind the scenes, the dataframe
function automatically coverts the source SQL table into a Pandas dataframe, and makes any dataframes resulting from the transformation available to downstream astro.sql
functions. This means you can seamlessly transition between Python and SQL for data transformations without writing any code to explicitly do so. To use the dataframe
operator, you simply provide a Python function that takes a dataframe as one of its inputs, and specify a Table
object as the input SQL table. If you want the resulting dataframe to be converted back to SQL, you can specify an output_table
object.
There are two main uses for the dataframe
operator.
Case 1: Convert a SQL table into a dataframe.
@aql.dataframe(columns_names_capitalization="original") def aggregate_data(df: pd.DataFrame): new_df = df.pivot_table( index="date", values="name", columns=["type"], aggfunc="count" ).reset_index() new_df.columns = new_df.columns.str.lower() return new_df
Case 2: Convert the resulting dataframe into a table. When the output_table
parameter is specified, the resulting dataframe is turned into a table.
aggregate_data( cleaned_data, output_table=Table( name="aggregated_adoptions_" + str(int(time.time())), metadata=Metadata( schema=os.environ["SNOWFLAKE_SCHEMA"], database=os.environ["SNOWFLAKE_DATABASE"], ), conn_id="snowflake_conn", ), )