run_raw_sql operator
When to use the run_raw_sql
operator
The run_raw_sql
operator allows you to declare any SQL statement using the Astro SDK Templating available in transform operator. By default this operator returns None
, but you can alternatively define the task output by using the handler
argument. For example, you may wish to return a list with the row results of a query.
The run_raw_sql
function also treats values in double brackets as Airflow jinja templates. You can find more details on templating at Templating.
This example shows how you can create a Snowflake table using run_raw_sql
without the handler
. This task will return None
.
@run_raw_sql
def create_table(table: Table):
"""Create the reporting data which will be the target of the append method"""
return """
CREATE OR REPLACE TABLE {{table}} (
sell number,
list number,
variable varchar,
value number
);
"""
@dag(
start_date=datetime(2021, 12, 1),
schedule_interval=None,
catchup=False,
default_args={
"email_on_failure": False,
"retries": 1,
"retry_delay": timedelta(seconds=5),
},
)
def example_snowflake_partial_table_with_append():
homes_reporting = Table(name="homes_reporting", temp=True, conn_id=SNOWFLAKE_CONN_ID)
create_results_table = create_table(table=homes_reporting, conn_id=SNOWFLAKE_CONN_ID)
This example shows how you can run a select
query in Bigquery and return rows using the handler
argument. This task will return the results of the query.
def handle_result(result):
return result.fetchall()
with DAG(
dag_id="example_dynamic_map_task",
schedule_interval=None,
start_date=datetime(2022, 1, 1),
catchup=False,
tags=["airflow_version:2.3.0"],
default_args=default_args,
) as dag:
@aql.run_raw_sql(handler=handle_result)
def get_campaigns(table: Table):
return """select id from {{table}}"""
bq_table = aql.load_file(
input_file=File(path=f"{ASTRO_S3_BUCKET}/ads.csv"),
output_table=Table(
metadata=Metadata(
schema=ASTRO_BIGQUERY_DATASET,
),
conn_id=ASTRO_GCP_CONN_ID,
),
use_native_support=False,
)
ids = get_campaigns(bq_table)
Parameters
handler - This parameter is used to pass a callback and this callback gets a cursor object from the database.
results_format - There are common scenarios where the kind of results you would expect from the handler function to return.
List - If you expect a query return to be a list of rows. instead of passing handler to do
cursor.fetchall()
, we can passresults_format=='list'
Pandas Dataframe - If you expect query result to be converted to
Pandas Dataframe
we can passresults_format=='pandas_dataframe'
fail_on_empty - Sometimes the handler function can raise an exception when the data is not returned by the database and we try to run
fetchall()
. We can make sure that the handler function doesn’t raise an exception by passingfail_on_empty==False
. The default value for this parameter isTrue
.query_modifier - The
query_modifier
parameter allows you to define statements to run before and after therun_raw_sql
main statement. To associate a Snowflake query tag, for instance, it is possible to usequery_modifier=QueryModifier(pre_queries=["ALTER SESSION SET QUERY_TAG=<my-query-tag>])
.