run_raw_sql operator
When to use the run_raw_sql
operator
The run_raw_sql
operator allows you to declare any SQL statement using the Astro SDK Templating available in transform operator. By default this operator returns None
, but you can alternatively define the task output by using the handler
argument. For example, you may wish to return a list with the row results of a query.
The run_raw_sql
function also treats values in double brackets as Airflow jinja templates. You can find more details on templating at Templating.
This example shows how you can create a Snowflake table using run_raw_sql
without the handler
. This task will return None
.
@run_raw_sql
def create_table(table: Table):
"""Create the reporting data which will be the target of the append method"""
return """
CREATE OR REPLACE TABLE {{table}} (
sell number,
list number,
variable varchar,
value number
);
"""
@dag(start_date=datetime(2021, 12, 1), schedule_interval="@daily", catchup=False)
def example_snowflake_partial_table_with_append():
homes_reporting = Table(conn_id=SNOWFLAKE_CONN_ID)
create_results_table = create_table(table=homes_reporting, conn_id=SNOWFLAKE_CONN_ID)
This example shows how you can run a select
query in Bigquery and return rows using the handler
argument. This task will return the results of the query.
def handle_result(result):
return result.fetchall()
with DAG(
dag_id="example_dynamic_map_task",
schedule_interval=None,
start_date=datetime(2022, 1, 1),
catchup=False,
tags=["airflow_version:2.3.0"],
) as dag:
@aql.run_raw_sql(handler=handle_result)
def get_campaigns(table: Table):
return """select id from {{table}}"""
bq_table = aql.load_file(
input_file=File(path=f"{ASTRO_S3_BUCKET}/ads.csv"),
output_table=Table(
metadata=Metadata(
schema=ASTRO_BIGQUERY_DATASET,
),
conn_id=ASTRO_GCP_CONN_ID,
),
use_native_support=False,
)
ids = get_campaigns(bq_table)