astro.sql.operators.cleanup
Module Contents
Classes
Clean up temporary tables at the end of a DAG run. |
Functions
|
|
|
Clean up temporary tables once either the DAG or upstream tasks are done |
Attributes
- astro.sql.operators.cleanup.MappedOperator
- astro.sql.operators.cleanup.OPERATOR_CLASSES_WITH_TABLE_OUTPUT = ()
- astro.sql.operators.cleanup.filter_for_temp_tables(task_outputs)
- Parameters:
task_outputs (list[Any]) –
- Return type:
list[astro.table.TempTable]
- exception astro.sql.operators.cleanup.AstroCleanupException
Bases:
airflow.exceptions.AirflowException
Base class for all Airflow’s errors.
Each custom exception should be derived from this class.
- msg = 'When using a synchronous executor (e.g. SequentialExecutor and DebugExecutor), the first run of...'
- class astro.sql.operators.cleanup.CleanupOperator(*, tables_to_cleanup=None, task_id='', retries=3, retry_delay=timedelta(seconds=10), run_sync_mode=False, skip_on_failure=False, **kwargs)
Bases:
astro.sql.operators.base_operator.AstroSQLBaseOperator
Clean up temporary tables at the end of a DAG run.
Temporary tables are the ones that are generated by the SDK (where you do not pass a name arg to Table) or the ones that has the name that starts with
_tmp
.By default if no tables are placed, the task will wait for all other tasks to run before deleting all temporary tables.
If using a synchronous executor (e.g. SequentialExecutor and DebugExecutor), this task will initially fail on purpose, so the executor is unblocked and can run other tasks. Users may have to define custom values for retries and retry_delay if they intend to use one of these executors.
- Parameters:
tables_to_cleanup (list[astro.table.BaseTable] | None) – List of tables to drop at the end of the DAG run
task_id (str) – Optional custom task id
retries (int) – The number of retries that should be performed before failing the task. Very relevant if using a synchronous executor. The default is 3.
retry_delay (datetime.timedelta) – Delay between running retries. Very relevant if using a synchronous executor. The default is 10s.
run_sync_mode (bool) – Whether to wait for the DAG to finish or not. Set to True if you want to immediately clean all tables. Note that if you supply anything to tables_to_cleanup this argument is ignored.
skip_on_failure (bool) – Skip cleanup if any upstream task fails. Useful while debugging failed tasks, to prevent temporary tables upstream from being deleted prematurely. The default is False.
- template_fields = ('tables_to_cleanup',)
- execute(context)
Derive when creating an operator.
Context is the same dictionary used as when rendering jinja templates.
Refer to get_template_context for more context.
- Parameters:
context (astro.utils.compat.typing.Context) –
- Return type:
None
- drop_table(table)
- Parameters:
table (astro.table.BaseTable) –
- Return type:
None
- wait_for_dag_to_finish(context)
In the event that we are not given any tables, we will want to wait for all other tasks to finish before we delete temporary tables. This prevents a scenario where either a) we delete temporary tables that are still in use, or b) we run this function too early and then there are temporary tables that don’t get deleted.
Eventually this function should be made into an asynchronous function s.t. this operator does not take up a worker slot.
- Parameters:
context (astro.utils.compat.typing.Context) – TI’s Context dictionary
- Return type:
None
- get_all_task_outputs(context)
In the scenario where we are not given a list of tasks to follow, we will want to gather all temporary tables To prevent scenarios where we grab objects that are not tables, we try to only follow up on SQL operators or the dataframe operator, as these are the operators that return temporary tables.
- Parameters:
context (astro.utils.compat.typing.Context) – Context of the DAGRun so we can resolve against the XCOM table
- Return type:
list[astro.table.BaseTable]
- resolve_tables_from_tasks(tasks, context)
For the moment, these are the only two classes that create temporary tables. This function allows us to only resolve xcom for those objects (to reduce how much data is brought into the worker).
We also process these values one at a time so the system can garbage collect non-table objects (otherwise we might run into a situation where we pull in a bunch of dataframes and overwhelm the worker). :param tasks: A list of operators from airflow that we can resolve :param context: Context of the DAGRun so we can resolve against the XCOM table :return: List of tables
- Parameters:
tasks (list[airflow.models.baseoperator.BaseOperator]) –
context (astro.utils.compat.typing.Context) –
- Return type:
list[astro.table.BaseTable]
- astro.sql.operators.cleanup.cleanup(tables_to_cleanup=None, **kwargs)
Clean up temporary tables once either the DAG or upstream tasks are done
The cleanup operator allows for two possible scenarios: Either a user wants to clean up a specific set of tables during the DAG run, or the user wants to ensure that all temporary tables are deleted once the DAG run is finished. The idea here is to ensure that even if a user doesn’t have access to a “temp” schema, that astro does not leave hanging tables once execution is done.
- Parameters:
tables_to_cleanup (list[astro.table.BaseTable] | None) – A list of tables to cleanup, defaults to waiting for all upstream tasks to finish
kwargs – Any keyword arguments supported by the BaseOperator is supported (e.g
queue
,owner
)
- Return type: