dataframe operator

When to use the dataframe operator

The dataframe operator allows you to run Python transformations in Airflow. Behind the scenes, the dataframe function automatically coverts the source SQL table into a Pandas dataframe, and makes any dataframes resulting from the transformation available to downstream astro.sql functions. This means you can seamlessly transition between Python and SQL for data transformations without writing any code to explicitly do so. To use the dataframe operator, you simply provide a Python function that takes a dataframe as one of its inputs, and specify a Table object as the input SQL table. If you want the resulting dataframe to be converted back to SQL, you can specify an output_table object.

There are two main uses for the dataframe operator.

Case 1: Convert a SQL table into a dataframe.

@aql.dataframe(columns_names_capitalization="original")
def aggregate_data(df: pd.DataFrame):
    new_df = df.pivot_table(
        index="date", values="name", columns=["type"], aggfunc="count"
    ).reset_index()
    new_df.columns = new_df.columns.str.lower()
    return new_df


Case 2: Convert the resulting dataframe into a table. When the output_table parameter is specified, the resulting dataframe is turned into a table.

    aggregate_data(
        cleaned_data,
        output_table=Table(
            name="aggregated_adoptions_" + str(int(time.time())),
            metadata=Metadata(
                schema=os.environ["SNOWFLAKE_SCHEMA"],
                database=os.environ["SNOWFLAKE_DATABASE"],
            ),
            conn_id="snowflake_conn",
        ),
    )