transform operator

When to use the transform operator

The transform operator allows you to implement the T of an ELT system by running a SQL query. Each step of the transform pipeline creates a new table from the SELECT statement and enables tasks to pass those tables as if they were native Python objects.

The transform operator treats values in the double brackets as Airflow jinja templates. You can find more details on templating at Templating.

There are two main uses for the transform operator.

Case 1: Passing tables between tasks while completing data transformations.

The following example applies a SQL SELECT statement to a imdb_movies table with templating and saves the result to a astro-sdk tmp table.

Note that the input_table in the double brackets is treated as an Airflow jinja template. It is not an f string. F-strings in SQL formatting are at risk of security breaches via SQL injections. For security, you must explicitly identify tables in the function parameters by typing a value as a Table. Only then will the transform operator treat the value as a table.

@aql.transform()
def top_five_animations(input_table: Table):  # skipcq: PYL-W0613
    return """
        SELECT *
        FROM {{input_table}}
        WHERE genre1=='Animation'
        ORDER BY rating desc
        LIMIT 5;
    """


The following example applies a SQL SELECT statement to a imdb_movies table with templating and saves the result to a astro-sdk tmp table.

@aql.transform()
def last_five_animations(input_table: Table):  # skipcq: PYL-W0613
    return """
        SELECT *
        FROM {{input_table}}
        WHERE genre1=='Animation'
        ORDER BY rating asc
        LIMIT 5;
    """


You can easily pass tables between tasks when completing a data transformation.

@aql.transform
def union_top_and_last(first_table: Table, second_table: Table):  # skipcq: PYL-W0613
    """Union `first_table` and `second_table` tables to create a simple dataset."""
    return """
            SELECT title, rating from {{first_table}}
            UNION
            SELECT title, rating from {{second_table}};
            """


Case 2: Passing a Pandas dataframe between tasks while completing data transformations.

The following example shows how you can quickly pass a table and a Pandas dataframe between tasks when completing a data transformation.

@aql.transform
def union_table_and_dataframe(input_table: Table, input_dataframe: pd.DataFrame):  # skipcq: PYL-W0613
    """Union `union_table` table and `input_dataframe` dataframe to create a simple dataset."""
    return """
            SELECT title, rating from {{input_table}}
            UNION
            SELECT title, rating from {{input_dataframe}};
            """


Please note that in case you want to pass SQL file in the transform decorator, use transform_file operator

Parameters

  • query_modifier - The query_modifier parameter allows you to define statements to run before and after the run_raw_sql main statement. To associate a Snowflake query tag, for instance, it is possible to use query_modifier=QueryModifier(pre_queries=["ALTER SESSION SET QUERY_TAG=<my-query-tag>]).