transform operator
When to use the transform
operator
The transform
operator allows you to implement the T of an ELT system by running a SQL query. Each step of the transform pipeline creates a new table from the SELECT
statement and enables tasks to pass those tables as if they were native Python objects.
The transform
operator treats values in the double brackets as Airflow jinja templates. You can find more details on templating at Templating.
There are two main uses for the transform
operator.
- Case 1: Passing tables between tasks while completing data transformations.
The following example applies a SQL
SELECT
statement to aimdb_movies
table with templating and saves the result to atop_animation
table.Note that the
input_table
in the double brackets is treated as an Airflow jinja template. It is not an f string. F-strings in SQL formatting are at risk of security breaches via SQL injections. For security, you must explicitly identify tables in the function parameters by typing a value as a Table. Only then will thetransform
operator treat the value as a table.@aql.transform() def top_five_animations(input_table: Table): # skipcq: PYL-W0613 return """ SELECT * FROM {{input_table}} WHERE genre1=='Animation' ORDER BY rating desc LIMIT 5; """
The following example applies a SQL
SELECT
statement to aimdb_movies
table with templating and saves the result to alast_animation
table.@aql.transform() def last_five_animations(input_table: Table): # skipcq: PYL-W0613 return """ SELECT * FROM {{input_table}} WHERE genre1=='Animation' ORDER BY rating asc LIMIT 5; """
You can easily pass tables between tasks when completing a data transformation.
@aql.transform def union_top_and_last(first_table: Table, second_table: Table): # skipcq: PYL-W0613 """Union `first_table` and `second_table` tables to create a simple dataset.""" return """ SELECT title, rating from {{first_table}} UNION SELECT title, rating from {{second_table}}; """
- Case 2: Passing a Pandas dataframe between tasks while completing data transformations.
The following example shows how you can quickly pass a table and a Pandas dataframe between tasks when completing a data transformation.
@aql.transform def union_table_and_dataframe(input_table: Table, input_dataframe: pd.DataFrame): # skipcq: PYL-W0613 """Union `union_table` table and `input_dataframe` dataframe to create a simple dataset.""" return """ SELECT title, rating from {{input_table}} UNION SELECT title, rating from {{input_dataframe}}; """
Please note that in case you want to pass SQL file in the transform decorator, use transform_file operator