dataframe operator
When to use the dataframe
operator
The dataframe
operator allows you to run Python transformations in Airflow. Behind the scenes, the dataframe
function automatically coverts the source SQL table into a Pandas dataframe, and makes any dataframes resulting from the transformation available to downstream astro.sql
functions. This means you can seamlessly transition between Python and SQL for data transformations without writing any code to explicitly do so. To use the dataframe
operator, you simply provide a Python function that takes a dataframe as one of its inputs, and specify a Table
object as the input SQL table. If you want the resulting dataframe to be converted back to SQL, you can specify an output_table
object.
There are two main uses for the dataframe
operator.
Case 1: Convert a SQL table into a dataframe.
@aql.dataframe(columns_names_capitalization="original") def aggregate_data(df: pd.DataFrame): new_df = df.pivot_table(index="date", values="name", columns=["type"], aggfunc="count").reset_index() new_df.columns = new_df.columns.str.lower() return new_df
Case 2: Convert the resulting dataframe into a table. When the output_table
parameter is specified, the resulting dataframe is turned into a table.
snowflake_output_table = Table( name="aggregated_adoptions_" + str(int(time.time())), metadata=Metadata( schema=os.environ["SNOWFLAKE_SCHEMA"], database=os.environ["SNOWFLAKE_DATABASE"], ), conn_id="snowflake_conn", temp=True, ) aggregate_data( cleaned_data, output_table=snowflake_output_table, )
Case 3: Pass the result of a dataframe function as a list or a dictionary
@aql.dataframe(columns_names_capitalization="original") def load_and_group_covid_data(): """ Loads data from a COVID data REST API and then groups values based on the months. :return: A list of dataframes for each month of the pandemic """ covid_df = _load_covid_data() covid_df["Date_YMD"] = covid_df["Date_YMD"].apply(lambda d: datetime.strptime(d, "%Y-%m-%d")) return [x for _, x in covid_df.groupby(covid_df.Date_YMD.dt.month)] @aql.dataframe(columns_names_capitalization="original") def find_worst_covid_month(dfs: List[pd.DataFrame]): """ Takes a list of dataframes and then finds the month with the worst covid outbreak :param dfs: a list of DFs containing COVID data """ res = {} for covid_month_data in dfs: if ALLOWED_DESERIALIZATION_CLASSES == "airflow\\.* astro\\.*": covid_month = datetime.fromtimestamp(covid_month_data.Date_YMD.iloc[0] / 1e3).strftime("%Y-%m") else: covid_month = covid_month_data.Date_YMD.iloc[0].__format__("%Y-%m") num_deceased = covid_month_data["Daily Deceased"].sum() res[covid_month] = num_deceased print(f"Found {num_deceased} dead for month {covid_month}") max_dead_month = max(res, key=res.get) # type: ignore print(f"The worst month was {max_dead_month} with {res[max_dead_month]} dead") with DAG( "example_dataframe", schedule_interval=None, start_date=START_DATE, catchup=False, default_args=default_args, ) as dag: covid_data = load_and_group_covid_data() find_worst_covid_month(covid_data)
Default Datasets
Input dataset - No default input dataset.
Output dataset - Target table of the operator.