astro.sql.operators.transform
Module Contents
Classes
Given a SQL statement and (optional) tables, execute the SQL statement and output |
Functions
|
Given a python function that returns a SQL statement and (optional) tables, execute the SQL statement and output |
|
This function returns a |
- class astro.sql.operators.transform.TransformOperator(conn_id=None, parameters=None, handler=None, database=None, schema=None, response_limit=-1, response_size=-1, sql='', task_id='', assume_schema_exists=ASSUME_SCHEMA_EXISTS, **kwargs)
Bases:
astro.sql.operators.base_decorator.BaseSQLDecoratedOperator
Given a SQL statement and (optional) tables, execute the SQL statement and output the result into a SQL table.
- Parameters:
conn_id (str | None) –
parameters (dict | None) –
handler (sqlalchemy.sql.functions.Function | None) –
database (str | None) –
schema (str | None) –
response_limit (int) –
response_size (int) –
sql (str) –
task_id (str) –
assume_schema_exists (bool) –
kwargs (Any) –
- execute(context)
Derive when creating an operator.
Context is the same dictionary used as when rendering jinja templates.
Refer to get_template_context for more context.
- Parameters:
context (astro.utils.compat.typing.Context) –
- astro.sql.operators.transform.transform(python_callable=None, conn_id='', parameters=None, database=None, schema=None, assume_schema_exists=ASSUME_SCHEMA_EXISTS, **kwargs)
Given a python function that returns a SQL statement and (optional) tables, execute the SQL statement and output the result into a SQL table.
Use this function as a decorator like so:
@transform def my_sql_statement(table1: Table, table2: Table) -> Table: return "SELECT * FROM {{table1}} JOIN {{table2}}"
In this example, by identifying the parameters as Table objects, astro knows to automatically convert those objects into tables (if they are, for example, a dataframe). Any type besides table will lead astro to assume you do not want the parameter converted.
You can also pass parameters into the query like so
@transform def my_sql_statement(table1: Table, table2: Table, execution_date) -> Table: return "SELECT * FROM {{table1}} JOIN {{table2}} WHERE date > {{exec_date}}", { "exec_date": execution_date }
- Parameters:
python_callable (Callable | None) – This parameter is filled in automatically when you use the transform function as a decorator This is where the python function gets passed to the wrapping function
conn_id (str) – Connection ID for the database you want to connect to. If you do not pass in a value for this object we can infer the connection ID from the first table passed into the python_callable function. (required if there are no table arguments)
parameters (collections.abc.Mapping | collections.abc.Iterable | None) – parameters to pass into the SQL query
database (str | None) – Database within the SQL instance you want to access. If left blank we will default to the table.metadata.database in the first Table passed to the function (required if there are no table arguments)
schema (str | None) – Schema within the SQL instance you want to access. If left blank we will default to the table.metadata.schema in the first Table passed to the function (required if there are no table arguments)
assume_schema_exists (bool) – If True, do not check if the output table schema exists or attempt to create it
kwargs (Any) – Any keyword arguments supported by the BaseOperator is supported (e.g
queue
,owner
)
- Returns:
Transform functions return a
Table
object that can be passed to future tasks. This table will be either an auto-generated temporary table, or will overwrite a table given in the output_table parameter.- Return type:
- astro.sql.operators.transform.transform_file(file_path, conn_id='', parameters=None, database=None, schema=None, assume_schema_exists=ASSUME_SCHEMA_EXISTS, **kwargs)
This function returns a
Table
object that can be passed to future tasks from specified SQL file. Tables can be inserted via the parameters kwarg.- Parameters:
file_path (str) – File path for the SQL file you would like to parse. Can be an absolute path, or you can use a relative path if the template_searchpath variable is set in your DAG
conn_id (str) – Connection ID for the database you want to connect to. If you do not pass in a value for this object we can infer the connection ID from the first table passed into the python_callable function. (required if there are no table arguments)
parameters (dict | None) – parameters to pass into the SQL query
database (str | None) – Database within the SQL instance you want to access. If left blank we will default to the table.metadata.database in the first Table passed to the function (required if there are no table arguments)
schema (str | None) – Schema within the SQL instance you want to access. If left blank we will default to the table.metadata.schema in the first Table passed to the function (required if there are no table arguments)
assume_schema_exists (bool) – If True, do not check if the output table schema exists or attempt to create it
kwargs (Any) – Any keyword arguments supported by the BaseOperator is supported (e.g
queue
,owner
)
- Returns:
Transform functions return a
Table
object that can be passed to future tasks. This table will be either an auto-generated temporary table, or will overwrite a table given in the output_table parameter.- Return type:
airflow.models.xcom_arg.XComArg