run_raw_sql operator

When to use the run_raw_sql operator

The run_raw_sql operator allows you to declare any SQL statement using the Astro SDK Templating available in transform operator. By default this operator returns None, but you can alternatively define the task output by using the handler argument. For example, you may wish to return a list with the row results of a query.

The run_raw_sql function also treats values in double brackets as Airflow jinja templates. You can find more details on templating at Templating.

This example shows how you can create a Snowflake table using run_raw_sql without the handler. This task will return None.

@run_raw_sql
def create_table(table: Table):
    """Create the reporting data which will be the target of the append method"""
    return """
      CREATE OR REPLACE TABLE {{table}} (
      sell number,
      list number,
      variable varchar,
      value number
    );
    """


@dag(
    start_date=datetime(2021, 12, 1),
    schedule_interval=None,
    catchup=False,
    default_args={
        "email_on_failure": False,
        "retries": 1,
        "retry_delay": timedelta(seconds=5),
    },
)
def example_snowflake_partial_table_with_append():
    homes_reporting = Table(name="homes_reporting", temp=True, conn_id=SNOWFLAKE_CONN_ID)
    create_results_table = create_table(table=homes_reporting, conn_id=SNOWFLAKE_CONN_ID)

This example shows how you can run a select query in Bigquery and return rows using the handler argument. This task will return the results of the query.

def handle_result(result):
    return result.fetchall()


with DAG(
    dag_id="example_dynamic_map_task",
    schedule_interval=None,
    start_date=datetime(2022, 1, 1),
    catchup=False,
    tags=["airflow_version:2.3.0"],
    default_args=default_args,
) as dag:

    @aql.run_raw_sql(handler=handle_result)
    def get_campaigns(table: Table):
        return """select id from {{table}}"""

    bq_table = aql.load_file(
        input_file=File(path=f"{ASTRO_S3_BUCKET}/ads.csv"),
        output_table=Table(
            metadata=Metadata(
                schema=ASTRO_BIGQUERY_DATASET,
            ),
            conn_id=ASTRO_GCP_CONN_ID,
        ),
        use_native_support=False,
    )
    ids = get_campaigns(bq_table)

Parameters

  • handler - This parameter is used to pass a callback and this callback gets a cursor object from the database.

  • results_format - There are common scenarios where the kind of results you would expect from the handler function to return.

    1. List - If you expect a query return to be a list of rows. instead of passing handler to do cursor.fetchall(), we can pass results_format=='list'

    2. Pandas Dataframe - If you expect query result to be converted to Pandas Dataframe we can pass results_format=='pandas_dataframe'

  • fail_on_empty - Sometimes the handler function can raise an exception when the data is not returned by the database and we try to run fetchall(). We can make sure that the handler function doesn’t raise an exception by passing fail_on_empty==False. The default value for this parameter is True.

  • query_modifier - The query_modifier parameter allows you to define statements to run before and after the run_raw_sql main statement. To associate a Snowflake query tag, for instance, it is possible to use query_modifier=QueryModifier(pre_queries=["ALTER SESSION SET QUERY_TAG=<my-query-tag>]).