run_raw_sql operator

When to use the run_raw_sql operator

The run_raw_sql operator allows you to declare any SQL statement using the Astro SDK Templating available in transform operator. By default this operator returns None, but you can alternatively define the task output by using the handler argument. For example, you may wish to return a list with the row results of a query.

The run_raw_sql function also treats values in double brackets as Airflow jinja templates. You can find more details on templating at Templating.

This example shows how you can create a Snowflake table using run_raw_sql without the handler. This task will return None.

@run_raw_sql
def create_table(table: Table):
    """Create the reporting data which will be the target of the append method"""
    return """
    CREATE TABLE IF NOT EXISTS {{table}} (
      sell number,
      list number,
      variable varchar,
      value number
    );
    """


@dag(start_date=datetime(2021, 12, 1), schedule_interval="@daily", catchup=False)
def example_snowflake_partial_table_with_append():
    homes_reporting = Table(conn_id=SNOWFLAKE_CONN_ID)
    create_results_table = create_table(table=homes_reporting, conn_id=SNOWFLAKE_CONN_ID)

This example shows how you can run a select query in Bigquery and return rows using the handler argument. This task will return the results of the query.

def handle_result(result):
    return result.fetchall()


with DAG(
    dag_id="example_dynamic_map_task",
    schedule_interval=None,
    start_date=datetime(2022, 1, 1),
    catchup=False,
    tags=["airflow_version:2.3.0"],
) as dag:

    @aql.run_raw_sql(handler=handle_result)
    def get_campaigns(table: Table):
        return """select id from {{table}}"""

    bq_table = aql.load_file(
        input_file=File(path=f"{ASTRO_S3_BUCKET}/ads.csv"),
        output_table=Table(
            metadata=Metadata(
                schema=ASTRO_BIGQUERY_DATASET,
            ),
            conn_id=ASTRO_GCP_CONN_ID,
        ),
        use_native_support=False,
    )
    ids = get_campaigns(bq_table)