astro.sql.operators.base_decorator
Module Contents
Classes
Handles all decorator classes that can return a SQL function |
- class astro.sql.operators.base_decorator.BaseSQLDecoratedOperator(conn_id=None, parameters=None, handler=None, database=None, schema=None, response_limit=-1, response_size=-1, sql='', query_modifier=QueryModifier(), **kwargs)
Bases:
astro.sql.operators.upstream_task_mixin.UpstreamTaskMixin
,airflow.decorators.base.DecoratedOperator
Handles all decorator classes that can return a SQL function
- Parameters:
conn_id (str | None) –
parameters (dict | None) –
handler (sqlalchemy.sql.functions.Function | None) –
database (str | None) –
schema (str | None) –
response_limit (int) –
response_size (int) –
sql (str) –
query_modifier (astro.query_modifier.QueryModifier) –
kwargs (Any) –
- template_fields: Sequence[str] = ('sql', 'parameters', 'op_args', 'op_kwargs')
- template_ext: Sequence[str] = ('.sql',)
- database_impl()
- Return type:
- render_template_fields(context, jinja_env=None)
Template all attributes listed in template_fields.
This mutates the attributes in-place and is irreversible.
- Parameters:
context (astro.utils.compat.typing.Context) – Dict with values to apply on content
jinja_env (jinja2.Environment | None) – Jinja environment
- Return type:
airflow.models.BaseOperator | None
- execute(context)
Derive when creating an operator.
Context is the same dictionary used as when rendering jinja templates.
Refer to get_template_context for more context.
- Parameters:
context (astro.utils.compat.typing.Context) –
- Return type:
None
- create_output_table_if_needed()
If the user has not supplied an output table, this function creates one from scratch, otherwise populates the output table with necessary metadata.
- Return type:
None
- read_sql_from_function()
This function runs the provided python function and stores the resulting SQL query in the sql attribute. We can also store parameters if the user provides a dictionary.
- Return type:
None
- move_function_params_into_sql_params(context)
Pulls values from the function op_args and op_kwargs and places them into parameters for SQLAlchemy to parse
- Parameters:
context (dict) – Airflow’s Context dictionary used for rendering templates
- Return type:
None
- translate_jinja_to_sqlalchemy_template(context)
This function handles all jinja templating to ensure that the SQL statement is ready for processing by SQLAlchemy. We use the database object here as different databases will have different templating rules.
When running functions through the aql.transform and aql.render functions, we need to add the parameters given to the SQL statement to the Airflow context dictionary. This is how we can then use jinja to render those parameters into the SQL function when users use the {{}} syntax (e.g. “SELECT * FROM {{input_table}}”).
With this system we should handle Table objects differently from other variables. Since we will later pass the parameter dictionary into SQLAlchemy, the safest (From a security standpoint) default is to use a :variable syntax. This syntax will ensure that SQLAlchemy treats the value as an unsafe template. With Table objects, however, we have to give a raw value or the query will not work. Because of this we recommend looking into the documentation of your database and seeing what best practices exist (e.g. Identifier wrappers in snowflake).
- Parameters:
context (dict) –
- Return type:
None
- get_openlineage_facets_on_complete(task_instance)
Returns the lineage data
- get_source_code(py_callable)
Return the source code for the lineage
- Parameters:
py_callable (Callable) –
- Return type:
str | None
- load_op_arg_dataframes_into_sql(conn_id, op_args, output_table)
Identify dataframes in op_args and load them to the table.
- Parameters:
conn_id (str) – Connection identifier to be used to load content to the target_table
op_args (tuple) – user-defined decorator’s kwargs
output_table (astro.table.BaseTable) – Similar table where the dataframe content will be written to
- Returns:
New op_args, in which dataframes are replaced by tables
- Return type:
tuple
- load_op_kwarg_dataframes_into_sql(conn_id, op_kwargs, output_table)
Identify dataframes in op_kwargs and load them to a table.
- Parameters:
conn_id (str) – Connection identifier to be used to load content to the target_table
op_kwargs (dict) – user-defined decorator’s kwargs
output_table (astro.table.BaseTable) – Similar table where the dataframe content will be written to
- Returns:
New op_kwargs, in which dataframes are replaced by tables
- Return type:
dict