astro.sql.operators.base_decorator

Module Contents

Classes

BaseSQLDecoratedOperator

Handles all decorator classes that can return a SQL function

class astro.sql.operators.base_decorator.BaseSQLDecoratedOperator(conn_id=None, parameters=None, handler=None, database=None, schema=None, response_limit=-1, response_size=-1, sql='', query_modifier=QueryModifier(), **kwargs)

Bases: astro.sql.operators.upstream_task_mixin.UpstreamTaskMixin, airflow.decorators.base.DecoratedOperator

Handles all decorator classes that can return a SQL function

Parameters:
  • conn_id (str | None) –

  • parameters (dict | None) –

  • handler (sqlalchemy.sql.functions.Function | None) –

  • database (str | None) –

  • schema (str | None) –

  • response_limit (int) –

  • response_size (int) –

  • sql (str) –

  • query_modifier (astro.query_modifier.QueryModifier) –

  • kwargs (Any) –

template_fields: Sequence[str] = ('sql', 'parameters', 'op_args', 'op_kwargs')
template_ext: Sequence[str] = ('.sql',)
database_impl()
Return type:

astro.databases.base.BaseDatabase

render_template_fields(context, jinja_env=None)

Template all attributes listed in template_fields.

This mutates the attributes in-place and is irreversible.

Parameters:
Return type:

airflow.models.BaseOperator | None

execute(context)

Derive when creating an operator.

Context is the same dictionary used as when rendering jinja templates.

Refer to get_template_context for more context.

Parameters:

context (astro.utils.compat.typing.Context) –

Return type:

None

create_output_table_if_needed()

If the user has not supplied an output table, this function creates one from scratch, otherwise populates the output table with necessary metadata.

Return type:

None

read_sql_from_function()

This function runs the provided python function and stores the resulting SQL query in the sql attribute. We can also store parameters if the user provides a dictionary.

Return type:

None

move_function_params_into_sql_params(context)

Pulls values from the function op_args and op_kwargs and places them into parameters for SQLAlchemy to parse

Parameters:

context (dict) – Airflow’s Context dictionary used for rendering templates

Return type:

None

translate_jinja_to_sqlalchemy_template(context)

This function handles all jinja templating to ensure that the SQL statement is ready for processing by SQLAlchemy. We use the database object here as different databases will have different templating rules.

When running functions through the aql.transform and aql.render functions, we need to add the parameters given to the SQL statement to the Airflow context dictionary. This is how we can then use jinja to render those parameters into the SQL function when users use the {{}} syntax (e.g. “SELECT * FROM {{input_table}}”).

With this system we should handle Table objects differently from other variables. Since we will later pass the parameter dictionary into SQLAlchemy, the safest (From a security standpoint) default is to use a :variable syntax. This syntax will ensure that SQLAlchemy treats the value as an unsafe template. With Table objects, however, we have to give a raw value or the query will not work. Because of this we recommend looking into the documentation of your database and seeing what best practices exist (e.g. Identifier wrappers in snowflake).

Parameters:

context (dict) –

Return type:

None

get_openlineage_facets_on_complete(task_instance)

Returns the lineage data

get_source_code(py_callable)

Return the source code for the lineage

Parameters:

py_callable (Callable) –

Return type:

str | None

load_op_arg_dataframes_into_sql(conn_id, op_args, output_table)

Identify dataframes in op_args and load them to the table.

Parameters:
  • conn_id (str) – Connection identifier to be used to load content to the target_table

  • op_args (tuple) – user-defined decorator’s kwargs

  • output_table (astro.table.BaseTable) – Similar table where the dataframe content will be written to

Returns:

New op_args, in which dataframes are replaced by tables

Return type:

tuple

load_op_kwarg_dataframes_into_sql(conn_id, op_kwargs, output_table)

Identify dataframes in op_kwargs and load them to a table.

Parameters:
  • conn_id (str) – Connection identifier to be used to load content to the target_table

  • op_kwargs (dict) – user-defined decorator’s kwargs

  • output_table (astro.table.BaseTable) – Similar table where the dataframe content will be written to

Returns:

New op_kwargs, in which dataframes are replaced by tables

Return type:

dict