astro.sql.operators.transform

Module Contents

Classes

TransformOperator

Given a SQL statement and (optional) tables, execute the SQL statement and output

Functions

transform([python_callable, conn_id, parameters, ...])

Given a python function that returns a SQL statement and (optional) tables, execute the SQL statement and output

transform_file(file_path[, conn_id, parameters, ...])

This function returns a Table object that can be passed to future tasks from specified SQL file.

class astro.sql.operators.transform.TransformOperator(conn_id=None, parameters=None, handler=None, database=None, schema=None, response_limit=-1, response_size=-1, sql='', task_id='', assume_schema_exists=ASSUME_SCHEMA_EXISTS, **kwargs)

Bases: astro.sql.operators.base_decorator.BaseSQLDecoratedOperator

Given a SQL statement and (optional) tables, execute the SQL statement and output the result into a SQL table.

Parameters:
  • conn_id (str | None) –

  • parameters (dict | None) –

  • handler (sqlalchemy.sql.functions.Function | None) –

  • database (str | None) –

  • schema (str | None) –

  • response_limit (int) –

  • response_size (int) –

  • sql (str) –

  • task_id (str) –

  • assume_schema_exists (bool) –

  • kwargs (Any) –

execute(context)

Derive when creating an operator.

Context is the same dictionary used as when rendering jinja templates.

Refer to get_template_context for more context.

Parameters:

context (astro.utils.compat.typing.Context) –

astro.sql.operators.transform.transform(python_callable=None, conn_id='', parameters=None, database=None, schema=None, assume_schema_exists=ASSUME_SCHEMA_EXISTS, **kwargs)

Given a python function that returns a SQL statement and (optional) tables, execute the SQL statement and output the result into a SQL table.

Use this function as a decorator like so:

@transform
def my_sql_statement(table1: Table, table2: Table) -> Table:
    return "SELECT * FROM {{table1}} JOIN {{table2}}"

In this example, by identifying the parameters as Table objects, astro knows to automatically convert those objects into tables (if they are, for example, a dataframe). Any type besides table will lead astro to assume you do not want the parameter converted.

You can also pass parameters into the query like so

@transform
def my_sql_statement(table1: Table, table2: Table, execution_date) -> Table:
    return "SELECT * FROM {{table1}} JOIN {{table2}} WHERE date > {{exec_date}}", {
        "exec_date": execution_date
    }
Parameters:
  • python_callable (Callable | None) – This parameter is filled in automatically when you use the transform function as a decorator This is where the python function gets passed to the wrapping function

  • conn_id (str) – Connection ID for the database you want to connect to. If you do not pass in a value for this object we can infer the connection ID from the first table passed into the python_callable function. (required if there are no table arguments)

  • parameters (collections.abc.Mapping | collections.abc.Iterable | None) – parameters to pass into the SQL query

  • database (str | None) – Database within the SQL instance you want to access. If left blank we will default to the table.metadata.database in the first Table passed to the function (required if there are no table arguments)

  • schema (str | None) – Schema within the SQL instance you want to access. If left blank we will default to the table.metadata.schema in the first Table passed to the function (required if there are no table arguments)

  • assume_schema_exists (bool) – If True, do not check if the output table schema exists or attempt to create it

  • kwargs (Any) – Any keyword arguments supported by the BaseOperator is supported (e.g queue, owner)

Returns:

Transform functions return a Table object that can be passed to future tasks. This table will be either an auto-generated temporary table, or will overwrite a table given in the output_table parameter.

Return type:

airflow.decorators.base.TaskDecorator

astro.sql.operators.transform.transform_file(file_path, conn_id='', parameters=None, database=None, schema=None, assume_schema_exists=ASSUME_SCHEMA_EXISTS, **kwargs)

This function returns a Table object that can be passed to future tasks from specified SQL file. Tables can be inserted via the parameters kwarg.

Parameters:
  • file_path (str) – File path for the SQL file you would like to parse. Can be an absolute path, or you can use a relative path if the template_searchpath variable is set in your DAG

  • conn_id (str) – Connection ID for the database you want to connect to. If you do not pass in a value for this object we can infer the connection ID from the first table passed into the python_callable function. (required if there are no table arguments)

  • parameters (dict | None) – parameters to pass into the SQL query

  • database (str | None) – Database within the SQL instance you want to access. If left blank we will default to the table.metadata.database in the first Table passed to the function (required if there are no table arguments)

  • schema (str | None) – Schema within the SQL instance you want to access. If left blank we will default to the table.metadata.schema in the first Table passed to the function (required if there are no table arguments)

  • assume_schema_exists (bool) – If True, do not check if the output table schema exists or attempt to create it

  • kwargs (Any) – Any keyword arguments supported by the BaseOperator is supported (e.g queue, owner)

Returns:

Transform functions return a Table object that can be passed to future tasks. This table will be either an auto-generated temporary table, or will overwrite a table given in the output_table parameter.

Return type:

airflow.models.xcom_arg.XComArg