dataframe operator

When to use the dataframe operator

The dataframe operator allows you to run Python transformations in Airflow. Behind the scenes, the dataframe function automatically coverts the source SQL table into a Pandas dataframe, and makes any dataframes resulting from the transformation available to downstream astro.sql functions. This means you can seamlessly transition between Python and SQL for data transformations without writing any code to explicitly do so. To use the dataframe operator, you simply provide a Python function that takes a dataframe as one of its inputs, and specify a Table object as the input SQL table. If you want the resulting dataframe to be converted back to SQL, you can specify an output_table object.

There are two main uses for the dataframe operator.

Case 1: Convert a SQL table into a dataframe.

@aql.dataframe(columns_names_capitalization="original")
def aggregate_data(df: pd.DataFrame):
    new_df = df.pivot_table(index="date", values="name", columns=["type"], aggfunc="count").reset_index()
    new_df.columns = new_df.columns.str.lower()
    return new_df


Case 2: Convert the resulting dataframe into a table. When the output_table parameter is specified, the resulting dataframe is turned into a table.

    snowflake_output_table = Table(
        name="aggregated_adoptions_" + str(int(time.time())),
        metadata=Metadata(
            schema=os.environ["SNOWFLAKE_SCHEMA"],
            database=os.environ["SNOWFLAKE_DATABASE"],
        ),
        conn_id="snowflake_conn",
        temp=True,
    )
    aggregate_data(
        cleaned_data,
        output_table=snowflake_output_table,
    )

Case 3: Pass the result of a dataframe function as a list or a dictionary

@aql.dataframe(columns_names_capitalization="original")
def load_and_group_covid_data():
    """
    Loads data from a COVID data REST API and then groups values based on the months.
    :return: A list of dataframes for each month of the pandemic
    """
    covid_df = _load_covid_data()
    covid_df["Date_YMD"] = covid_df["Date_YMD"].apply(lambda d: datetime.strptime(d, "%Y-%m-%d"))
    return [x for _, x in covid_df.groupby(covid_df.Date_YMD.dt.month)]


@aql.dataframe(columns_names_capitalization="original")
def find_worst_covid_month(dfs: List[pd.DataFrame]):
    """
    Takes a list of dataframes and then finds the month with the worst covid outbreak
    :param dfs: a list of DFs containing COVID data
    """
    res = {}
    for covid_month_data in dfs:
        covid_month = covid_month_data.Date_YMD.iloc[0].__format__("%Y-%m")
        num_deceased = covid_month_data["Daily Deceased"].sum()
        res[covid_month] = num_deceased
        print(f"Found {num_deceased} dead for month {covid_month}")
    max_dead_month = max(res, key=res.get)  # type: ignore
    print(f"The worst month was {max_dead_month} with {res[max_dead_month]} dead")


with DAG(
    "example_dataframe",
    schedule_interval=None,
    start_date=START_DATE,
    catchup=False,
) as dag:
    covid_data = load_and_group_covid_data()
    find_worst_covid_month(covid_data)

Default Datasets

  • Input dataset - No default input dataset.

  • Output dataset - Target table of the operator.