astro.sql

Subpackages

Submodules

Package Contents

Functions

export_file(input_data, output_file[, if_exists, task_id])

Convert ExportFile into a function. Returns XComArg.

load_file(input_file[, output_table, task_id, ...])

Load a file or bucket into either a SQL table or a pandas dataframe.

transform([python_callable, multiple_outputs, ...])

Given a python function that returns a SQL statement and (optional) tables, execute the SQL statement and output

run_raw_sql([python_callable, multiple_outputs, ...])

Given a python function that returns a SQL statement and (optional) tables, execute the SQL statement and output

cleanup([tables_to_cleanup])

Clean up temporary tables once either the DAG or upstream tasks are done

append(*, source_table, target_table[, columns])

Append the source table rows into a destination table.

merge(*, target_table, source_table, columns, ...)

Merge the source table rows into a destination table.

truncate(table, **kwargs)

Truncate a table.

dataframe([python_callable, multiple_outputs, ...])

This decorator will allow users to write python functions while treating SQL tables as dataframes

Classes

AppendOperator

Append the source table rows into a destination table.

CleanupOperator

Clean up temporary tables at the end of a DAG run. Temporary tables are the ones that are

DataframeOperator

MergeOperator

Merge the source table rows into a destination table.

RawSQLOperator

Given a SQL statement, (optional) tables and a (optional) function, execute the SQL statement

TransformOperator

Given a SQL statement and (optional) tables, execute the SQL statement and output

TruncateOperator

Airflow Operator for truncating SQL tables.

Table

Withholds the information necessary to access a SQL Table.

Attributes

MergeConflictStrategy

APPEND_COLUMN_TYPE

MERGE_COLUMN_TYPE

astro.sql.export_file(input_data, output_file, if_exists='exception', task_id=None, **kwargs)

Convert ExportFile into a function. Returns XComArg.

Returns an XComArg object of type File which matches the output_file parameter.

This will allow users to perform further actions with the exported file.

e.g.

with sample_dag: table = aql.load_file(input_file=File(path=data_path), output_table=test_table) exported_file = aql.export_file(

input_data=table, output_file=File(path=”/tmp/saved_df.csv”), if_exists=”replace”,

) res_df = aql.load_file(input_file=exported_file)

Parameters
  • output_file (astro.files.File) – Path and conn_id

  • input_data (Union[astro.sql.table.Table, pandas.DataFrame]) – Input table / dataframe

  • if_exists (astro.constants.ExportExistsStrategy) – Overwrite file if exists. Default “exception”

  • task_id (Optional[str]) – task id, optional

  • kwargs (Any) –

Return type

airflow.models.xcom_arg.XComArg

astro.sql.load_file(input_file, output_table=None, task_id=None, if_exists='replace', ndjson_normalize_sep='_', **kwargs)

Load a file or bucket into either a SQL table or a pandas dataframe.

Parameters
  • input_file (astro.files.File) – File path and conn_id for object stores

  • output_table (Optional[astro.sql.table.Table]) – Table to create

  • task_id (Optional[str]) – task id, optional

  • if_exists (astro.constants.LoadExistStrategy) – default override an existing Table. Options: fail, replace, append

  • ndjson_normalize_sep (str) –

    separator used to normalize nested ndjson. ex - {“a”: {“b”:”c”}} will result in

    column - “a_b” where ndjson_normalize_sep = “_”

  • kwargs (Any) –

Return type

airflow.models.xcom_arg.XComArg

astro.sql.transform(python_callable=None, multiple_outputs=None, conn_id='', parameters=None, database=None, schema=None, handler=None, **kwargs)

Given a python function that returns a SQL statement and (optional) tables, execute the SQL statement and output the result into a SQL table.

Use this function as a decorator like so:

@transform def my_sql_statement(table1: Table, table2: Table) -> Table:

return “SELECT * FROM {{table1}} JOIN {{table2}}”

In this example, by identifying the parameters as Table objects, astro knows to automatically convert those objects into tables (if they are, for example, a dataframe). Any type besides table will lead astro to assume you do not want the parameter converted.

Parameters
  • python_callable (Optional[Callable]) –

  • multiple_outputs (Optional[bool]) –

  • conn_id (str) –

  • parameters (Optional[Union[Mapping, Iterable]]) –

  • database (Optional[str]) –

  • schema (Optional[str]) –

  • handler (Optional[Callable]) –

  • kwargs (Any) –

Returns

Transform functions return a Table object that can be passed to future tasks. This table will be

Return type

airflow.decorators.base.TaskDecorator

either an auto-generated temporary table, or will overwrite a table given in the output_table parameter.

astro.sql.run_raw_sql(python_callable=None, multiple_outputs=None, conn_id='', parameters=None, database=None, schema=None, handler=None, **kwargs)

Given a python function that returns a SQL statement and (optional) tables, execute the SQL statement and output the result into a SQL table.

Use this function as a decorator like so:

@transform def my_sql_statement(table1: Table) -> Table:

return “DROP TABLE {{table1}}”

In this example, by identifying parameters as Table objects, astro knows to automatically convert those objects into tables (if they are, for example, a dataframe). Any type besides table will lead astro to assume you do not want the parameter converted.

Please note that the run_raw_sql function will not create a temporary table. It will either return the result of a provided `handler function or it will not return anything at all.

Parameters
  • python_callable (Optional[Callable]) –

  • multiple_outputs (Optional[bool]) –

  • conn_id (str) –

  • parameters (Optional[Union[Mapping, Iterable]]) –

  • database (Optional[str]) –

  • schema (Optional[str]) –

  • handler (Optional[Callable]) –

  • kwargs (Any) –

Returns

Return type

airflow.decorators.base.TaskDecorator

astro.sql.cleanup(tables_to_cleanup=None, **kwargs)

Clean up temporary tables once either the DAG or upstream tasks are done

The cleanup operator allows for two possible scenarios: Either a user wants to clean up a specific set of tables during the DAG run, or the user wants to ensure that all temporary tables are deleted once the DAG run is finished. The idea here is to ensure that even if a user doesn’t have access to a “temp” schema, that astro does not leave hanging tables once execution is done.

Parameters
  • tables_to_cleanup (Optional[List[table.Table]]) – A list of tables to cleanup, defaults to waiting for all upstream tasks to finish

  • kwargs

Returns

astro.sql.append(*, source_table, target_table, columns=None, **kwargs)

Append the source table rows into a destination table.

Parameters
  • source_table (table.Table) – Contains the rows to be appended to the target_table (templated)

  • target_table (table.Table) – Contains the destination table in which the rows will be appended (templated)

  • columns (operators.append.APPEND_COLUMN_TYPE) – List/Tuple of columns if name of source and target tables are same. If the column names in source and target tables are different pass a dictionary of source_table columns names to target_table columns names. Examples: ["sell", "list"] or {"s_sell": "t_sell", "s_list": "t_list"}

  • kwargs (Any) –

astro.sql.merge(*, target_table, source_table, columns, target_conflict_columns, if_conflicts, **kwargs)

Merge the source table rows into a destination table.

Parameters
  • source_table (table.Table) – Contains the rows to be merged to the target_table (templated)

  • target_table (table.Table) – Contains the destination table in which the rows will be merged (templated)

  • columns (operators.merge.MERGE_COLUMN_TYPE) – List/Tuple of columns if name of source and target tables are same. If the column names in source and target tables are different pass a dictionary of source_table columns names to target_table columns names. Examples: ["sell", "list"] or {"s_sell": "t_sell", "s_list": "t_list"}

  • target_conflict_columns (List[str]) – List of cols where we expect to have a conflict while combining

  • if_conflicts (astro.constants.MergeConflictStrategy) – The strategy to be applied if there are conflicts.

  • kwargs (Any) –

astro.sql.truncate(table, **kwargs)

Truncate a table.

Parameters
  • table (table.Table) – Table to be truncated

  • kwargs (Any) –

Return type

operators.truncate.TruncateOperator

astro.sql.dataframe(python_callable=None, multiple_outputs=None, conn_id='', database=None, schema=None, task_id=None, identifiers_as_lower=True)

This decorator will allow users to write python functions while treating SQL tables as dataframes

This decorator allows a user to run python functions in Airflow but with the huge benefit that SQL tables will automatically be turned into dataframes and resulting dataframes can automatically used in astro.sql functions

Parameters
  • python_callable (Optional[Callable]) –

  • multiple_outputs (Optional[bool]) –

  • conn_id (str) –

  • database (Optional[str]) –

  • schema (Optional[str]) –

  • task_id (Optional[str]) –

  • identifiers_as_lower (Optional[bool]) –

Return type

Callable[Ellipsis, pandas.DataFrame]

astro.sql.MergeConflictStrategy
astro.sql.APPEND_COLUMN_TYPE
astro.sql.MERGE_COLUMN_TYPE
class astro.sql.AppendOperator(source_table, target_table, columns=None, task_id='', **kwargs)

Bases: airflow.models.baseoperator.BaseOperator

Append the source table rows into a destination table.

Parameters
  • source_table (astro.sql.table.Table) – Contains the rows to be appended to the target_table (templated)

  • target_table (astro.sql.table.Table) – Contains the destination table in which the rows will be appended (templated)

  • columns (APPEND_COLUMN_TYPE) – List/Tuple of columns if name of source and target tables are same. If the column names in source and target tables are different pass a dictionary of source_table columns names to target_table columns names. Examples: ["sell", "list"] or {"s_sell": "t_sell", "s_list": "t_list"}

  • task_id (str) –

  • kwargs (Any) –

template_fields = ['source_table', 'target_table']
execute(context)
Parameters

context (dict) –

Return type

astro.sql.table.Table

class astro.sql.CleanupOperator(*, tables_to_cleanup=None, task_id='', retries=3, retry_delay=timedelta(seconds=10), run_sync_mode=False, **kwargs)

Bases: airflow.models.baseoperator.BaseOperator

Clean up temporary tables at the end of a DAG run. Temporary tables are the ones that are generated by the SDK (where you do not pass a name arg to Table) or the ones that has the name that starts with _tmp.

By default if no tables are placed, the task will wait for all other tasks to run before deleting all temporary tables.

If using a synchronous executor (e.g. SequentialExecutor and DebugExecutor), this task will initially fail on purpose, so the executor is unblocked and can run other tasks. Users may have to define custom values for retries and retry_delay if they intend to use one of these executors.

Parameters
  • tables_to_cleanup (Optional[List[astro.sql.table.Table]]) – List of tables to drop at the end of the DAG run

  • task_id (str) – Optional custom task id

  • retries (int) – The number of retries that should be performed before failing the task. Very relevant if using a synchronous executor. The default is 3.

  • retry_delay (datetime.timedelta) – Delay between running retries. Very relevant if using a synchronous executor. The default is 10s.

  • run_sync_mode (bool) –

    Whether to wait for the DAG to finish or not. Set to False if you want to immediately clean all DAGs. Note that if you supply anything to tables_to_cleanup

    this argument is ignored.

template_fields = ['tables_to_cleanup']
execute(context)
Parameters

context (airflow.utils.context.Context) –

Return type

None

drop_table(table)
Parameters

table (astro.sql.table.Table) –

Return type

None

_is_dag_running(task_instances)

Given a list of task instances, determine whether the DAG (minus the current cleanup task) is still running.

Parameters

task_instances (List[airflow.models.taskinstance.TaskInstance]) –

Returns

boolean to show if all tasks besides this one have completed

Return type

bool

wait_for_dag_to_finish(context)

In the event that we are not given any tables, we will want to wait for all other tasks to finish before we delete temporary tables. This prevents a scenario where either a) we delete temporary tables that are still in use, or b) we run this function too early and then there are temporary tables that don’t get deleted.

Eventually this function should be made into an asynchronous function s.t. this operator does not take up a worker slot.

Parameters

context (airflow.utils.context.Context) – TI’s Context dictionary

Return type

None

classmethod _is_single_worker_mode(current_dagrun)
Parameters

current_dagrun (airflow.models.dagrun.DagRun) –

Return type

bool

static _get_executor_from_job_id(job_id)
Parameters

job_id (int) –

Return type

Optional[str]

get_all_task_outputs(context)

In the scenario where we are not given a list of tasks to follow, we will want to gather all temporary tables To prevent scenarios where we grab objects that are not tables, we try to only follow up on SQL operators or the dataframe operator, as these are the operators that return temporary tables.

Parameters

context (airflow.utils.context.Context) – Context of the DAGRun so we can resolve against the XCOM table

Return type

List[astro.sql.table.Table]

resolve_tables_from_tasks(tasks, context)

For the moment, these are the only two classes that create temporary tables. This function allows us to only resolve xcom for those objects (to reduce how much data is brought into the worker).

We also process these values one at a time so the system can garbage collect non-table objects (otherwise we might run into a situation where we pull in a bunch of dataframes and overwhelm the worker). :param tasks: A list of operators from airflow that we can resolve :param context: Context of the DAGRun so we can resolve against the XCOM table :return: List of tables

Parameters
  • tasks (List[airflow.models.baseoperator.BaseOperator]) –

  • context (airflow.utils.context.Context) –

Return type

List[astro.sql.table.Table]

class astro.sql.DataframeOperator(conn_id=None, database=None, schema=None, identifiers_as_lower=True, **kwargs)

Bases: airflow.decorators.base.DecoratedOperator

Parameters
  • conn_id (Optional[str]) –

  • database (Optional[str]) –

  • schema (Optional[str]) –

  • identifiers_as_lower (bool) –

execute(context)
Parameters

context (Dict) –

Return type

Union[astro.sql.table.Table, pandas.DataFrame]

class astro.sql.MergeOperator(*, target_table, source_table, columns, if_conflicts, target_conflict_columns, task_id='', **kwargs)

Bases: airflow.models.baseoperator.BaseOperator

Merge the source table rows into a destination table.

Parameters
  • source_table (astro.sql.table.Table) – Contains the rows to be merged to the target_table (templated)

  • target_table (astro.sql.table.Table) – Contains the destination table in which the rows will be merged (templated)

  • columns (MERGE_COLUMN_TYPE) – List/Tuple of columns if name of source and target tables are same. If the column names in source and target tables are different pass a dictionary of source_table columns names to target_table columns names. Examples: ["sell", "list"] or {"s_sell": "t_sell", "s_list": "t_list"}

  • target_conflict_columns (List[str]) – List of cols where we expect to have a conflict while combining

  • if_conflicts (astro.constants.MergeConflictStrategy) – The strategy to be applied if there are conflicts.

  • task_id (str) –

  • kwargs (Any) –

template_fields = ['target_table', 'source_table']
execute(context)
Parameters

context (dict) –

Return type

astro.sql.table.Table

class astro.sql.RawSQLOperator

Bases: astro.sql.operators.base.BaseSQLOperator

Given a SQL statement, (optional) tables and a (optional) function, execute the SQL statement and apply the function to the results, returning the result of the function.

Disclaimer: this could potentially trash the XCom Database, depending on the XCom backend used and on the SQL statement/function declared by the user.

execute(context)
Parameters

context (Dict) –

Return type

Any

class astro.sql.TransformOperator

Bases: astro.sql.operators.base.BaseSQLOperator

Given a SQL statement and (optional) tables, execute the SQL statement and output the result into a SQL table.

execute(context)
Parameters

context (Dict) –

class astro.sql.TruncateOperator(table, task_id='', **kwargs)

Bases: airflow.models.BaseOperator

Airflow Operator for truncating SQL tables.

Parameters
execute(context)

Method run when the Airflow runner calls the operator.

Parameters

context (Dict) –

Return type

None

class astro.sql.Table

Withholds the information necessary to access a SQL Table. It is agnostic to the database type. If no name is given, it auto-generates a name for the Table and considers it temporary.

Temporary tables are prefixed with the prefix TEMP_PREFIX.

template_fields = ['name']
conn_id :str =
name :str =
Return type

str

_name :str
metadata :Metadata
columns :List[sqlalchemy.Column]
temp :bool = False
__post_init__()
Return type

None

_create_unique_table_name(prefix='')

If a table is instantiated without a name, create a unique table for it. This new name should be compatible with all supported databases.

Parameters

prefix (str) –

Return type

str

create_similar_table()

Create a new table with a unique name but with the same metadata.

Return type

Table

property sqlalchemy_metadata

Return the Sqlalchemy metadata for the given table.

Return type

sqlalchemy.MetaData

property name

Return either the user-defined name or auto-generate one.

Return type

str