astro.databases

Subpackages

Submodules

Package Contents

Functions

get_class_name(module_ref[, suffix])

Get class name to be dynamically imported. Class name are expected to be in following formats

get_dict_with_module_names_to_dot_notations(base_path)

Given a directory, recursively identify which modules exist within it

create_database(conn_id)

Given a conn_id, return the associated Database class.

Classes

BaseDatabase

Base class to represent all the Database interactions.

Attributes

DEFAULT_CONN_TYPE_TO_MODULE_PATH

CUSTOM_CONN_TYPE_TO_MODULE_PATH

CONN_TYPE_TO_MODULE_PATH

SUPPORTED_DATABASES

astro.databases.get_class_name(module_ref, suffix='Location')

Get class name to be dynamically imported. Class name are expected to be in following formats example - module name: test suffix: Abc

expected class names -
  1. TESTAbc

  2. TestAbc

Parameters
  • module_ref (Any) – Module from which to get class location type implementation

  • suffix (str) – suffix for class name

Return type

str

astro.databases.get_dict_with_module_names_to_dot_notations(base_path)

Given a directory, recursively identify which modules exist within it (ignoring __init__.py & base.py) and create a dictionary which has module names as keys and the values are the dot notation import paths.

An example:
├── package

├── module.py ├── subpackage

├── __init__.py └── subpackage_module.py

Running:

from pathlib import Path from astro.utils.path import get_dict_with_module_names_to_dot_notations values = get_dict_with_module_names_to_dot_notations(Path(“package”)) print(values)

Prints:
{

“module”: “package.module”, “subpackage_module”: “package.subpackage.subpackage_module”

}

Parameters

base_path (pathlib.Path) –

Return type

Dict[str, str]

astro.databases.create_database(conn_id)

Given a conn_id, return the associated Database class.

Parameters

conn_id (str) – Database connection ID in Airflow

Return type

base.BaseDatabase

astro.databases.DEFAULT_CONN_TYPE_TO_MODULE_PATH
astro.databases.CUSTOM_CONN_TYPE_TO_MODULE_PATH
astro.databases.CONN_TYPE_TO_MODULE_PATH
astro.databases.SUPPORTED_DATABASES
class astro.databases.BaseDatabase(conn_id)

Bases: abc.ABC

Base class to represent all the Database interactions.

The goal is to be able to support new databases by adding a new module to the astro/databases directory, without the need of changing other modules and classes.

The exception is if the Airflow connection type does not match the new Database module name. In that case, we should update the dictionary CUSTOM_CONN_TYPE_TO_MODULE_PATH available at astro/databases/__init__.py.

Parameters

conn_id (str) –

_create_schema_statement :str = CREATE SCHEMA IF NOT EXISTS {}
_drop_table_statement :str = DROP TABLE IF EXISTS {}
_create_table_statement :str = CREATE TABLE IF NOT EXISTS {} AS {}
illegal_column_name_chars :List[str] = []
illegal_column_name_chars_replacement :List[str] = []
__repr__()

Return repr(self).

property sql_type
property hook

Return an instance of the database-specific Airflow hook.

Return type

airflow.hooks.dbapi.DbApiHook

property connection

Return a Sqlalchemy connection object for the given database.

Return type

sqlalchemy.engine.base.Connection

property sqlalchemy_engine

Return Sqlalchemy engine.

Return type

sqlalchemy.engine.base.Engine

run_sql(sql_statement, parameters=None)

Return the results to running a SQL statement.

Whenever possible, this method should be implemented using Airflow Hooks, since this will simplify the integration with Async operators.

Parameters
  • sql_statement (Union[str, sqlalchemy.sql.ClauseElement]) – Contains SQL query to be run against database

  • parameters (Optional[dict]) – Optional parameters to be used to render the query

table_exists(table)

Check if a table exists in the database.

Parameters

table (astro.sql.table.Table) – Details of the table we want to check that exists

Return type

bool

static get_merge_initialization_query(parameters)

Handles database-specific logic to handle constraints, keeping it agnostic to database.

Parameters

parameters (Tuple) –

Return type

str

static get_table_qualified_name(table)

Return table qualified name. This is Database-specific. For instance, in Sqlite this is the table name. In Snowflake, however, it is the database, schema and table

Parameters

table (astro.sql.table.Table) – The table we want to retrieve the qualified name for.

Return type

str

property default_metadata

Extract the metadata available within the Airflow connection associated with self.conn_id.

Returns

a Metadata instance

Return type

astro.sql.table.Metadata

populate_table_metadata(table)

Given a table, check if the table has metadata. If the metadata is missing, and the database has metadata, assign it to the table. If the table schema was not defined by the end, retrieve the user-defined schema. This method performs the changes in-place and also returns the table.

Parameters

table (astro.sql.table.Table) – Table to potentially have their metadata changed

Return table

Return the modified table

Return type

astro.sql.table.Table

create_table(table)

Create a SQL table. For this method to work, the table instance must contain columns.

Parameters

table (astro.sql.table.Table) – The table to be created.

Return type

None

create_table_from_select_statement(statement, target_table, parameters=None)

Export the result rows of a query statement into another table.

Parameters
  • statement (str) – SQL query statement

  • target_table (astro.sql.table.Table) – Destination table where results will be recorded.

  • parameters (Optional[dict]) – (Optional) parameters to be used to render the SQL query

Return type

None

drop_table(table)

Delete a SQL table, if it exists.

Parameters

table (astro.sql.table.Table) – The table to be deleted.

Return type

None

load_file_to_table(source_file, target_table, if_exists='replace', chunk_size=DEFAULT_CHUNK_SIZE)

Load the content of the source file to the target database table. If the table already exists, append or replace the content, depending on the value of if_exists.

Parameters
  • source_file (astro.files.File) – Local or remote filepath (e.g. a File(“/tmp/sample_data.csv”))

  • target_table (astro.sql.table.Table) – Table in which the file will be loaded

  • if_exists (astro.constants.LoadExistStrategy) – Strategy to be used in case the target table already exists

  • chunk_size (int) – Specify the number of rows in each batch to be written at a time.

Return type

None

load_pandas_dataframe_to_table(source_dataframe, target_table, if_exists='replace', chunk_size=DEFAULT_CHUNK_SIZE)

Create a table with the dataframe’s contents. If the table already exists, append or replace the content, depending on the value of if_exists.

Parameters
  • source_dataframe (pandas.DataFrame) – Local or remote filepath

  • target_table (astro.sql.table.Table) – Table in which the file will be loaded

  • if_exists (astro.constants.LoadExistStrategy) – Strategy to be used in case the target table already exists.

  • chunk_size (int) – Specify the number of rows in each batch to be written at a time.

Return type

None

append_table(source_table, target_table, source_to_target_columns_map)

Append the source table rows into a destination table. The argument if_conflicts allows the user to define how to handle conflicts.

Parameters
  • source_table (astro.sql.table.Table) – Contains the rows to be appended to the target_table

  • target_table (astro.sql.table.Table) – Contains the destination table in which the rows will be appended

  • source_to_target_columns_map (Dict[str, str]) – Dict of source_table columns names to target_table columns names

Return type

None

abstract merge_table(source_table, target_table, source_to_target_columns_map, target_conflict_columns, if_conflicts='exception')

Merge the source table rows into a destination table. The argument if_conflicts allows the user to define how to handle conflicts.

Parameters
  • source_table (astro.sql.table.Table) – Contains the rows to be merged to the target_table

  • target_table (astro.sql.table.Table) – Contains the destination table in which the rows will be merged

  • source_to_target_columns_map (Dict[str, str]) – Dict of target_table columns names to source_table columns names

  • target_conflict_columns (List[str]) – List of cols where we expect to have a conflict while combining

  • if_conflicts (astro.constants.MergeConflictStrategy) – The strategy to be applied if there are conflicts.

Return type

None

get_sqla_table(table)

Return SQLAlchemy table instance

Parameters

table (astro.sql.table.Table) – Astro Table to be converted to SQLAlchemy table instance

Return type

sqlalchemy.sql.schema.Table

export_table_to_pandas_dataframe(source_table)

Copy the content of a table to an in-memory Pandas dataframe.

Parameters

source_table (astro.sql.table.Table) – An existing table in the database

Return type

pandas.DataFrame

export_table_to_file(source_table, target_file, if_exists='exception')

Copy the content of a table to a target file of supported type, in a supported location.

Parameters
  • source_table (astro.sql.table.Table) – An existing table in the database

  • target_file (astro.files.File) – The path to the file to which we aim to dump the content of the database

  • if_exists (astro.constants.ExportExistsStrategy) – Overwrite file if exists. Default False

Return type

None

create_schema_if_needed(schema)

This function checks if the expected schema exists in the database. If the schema does not exist, it will attempt to create it.

Parameters

schema (Optional[str]) – DB Schema - a namespace that contains named objects like (tables, functions, etc)

Return type

None

abstract schema_exists(schema)

Checks if a schema exists in the database

Parameters

schema (str) – DB Schema - a namespace that contains named objects like (tables, functions, etc)

Return type

bool

get_sqlalchemy_template_table_identifier_and_parameter(table, jinja_table_identifier)

During the conversion from a Jinja-templated SQL query to a SQLAlchemy query, there is the need to convert a Jinja table identifier to a safe SQLAlchemy-compatible table identifier.

For example, the query:

sql_statement = “SELECT * FROM {{input_table}};” parameters = {“input_table”: Table(name=”user_defined_table”, metadata=Metadata(schema=”some_schema”))}

Can become (depending on the database):

“SELECT * FROM some_schema.user_defined_table;” parameters = {“input_table”: “user_defined_table”}

Since the table value is templated, there is a safety concern (e.g. SQL injection). We recommend looking into the documentation of the database and seeing what are the best practices. For example, Snowflake: https://docs.snowflake.com/en/sql-reference/identifier-literal.html

Parameters
  • table (astro.sql.table.Table) – The table object we want to generate a safe table identifier for

  • jinja_table_identifier (str) – The name used within the Jinja template to represent this table

Returns

value to replace the table identifier in the query and the value that should be used to replace it

Return type

Tuple[str, str]