astro.databases.mysql

Module Contents

Classes

MysqlDatabase

Base class to represent all the Database interactions.

Attributes

DEFAULT_CONN_ID

astro.databases.mysql.DEFAULT_CONN_ID
class astro.databases.mysql.MysqlDatabase(conn_id=DEFAULT_CONN_ID, table=None, load_options=None)

Bases: astro.databases.base.BaseDatabase

Base class to represent all the Database interactions.

The goal is to be able to support new databases by adding a new module to the astro/databases directory, without the need of changing other modules and classes.

The exception is if the Airflow connection type does not match the new Database module name. In that case, we should update the dictionary CUSTOM_CONN_TYPE_TO_MODULE_PATH available at astro/databases/__init__.py.

Parameters:
property default_metadata: astro.table.Metadata

schema and database are synonymous in MySQL

Return type:

astro.table.Metadata

property sql_type: str
Return type:

str

DEFAULT_SCHEMA
hook()

Retrieve Airflow hook to interface with the mysql database.

Return type:

airflow.providers.mysql.hooks.mysql.MySqlHook

populate_table_metadata(table)

Given a table, check if the table has metadata. If the metadata is missing, and the database has metadata, assign it to the table. If the table schema was not defined by the end, retrieve the user-defined schema. This method performs the changes in-place and also returns the table. For mysql - schema is synonymous with database.

Parameters:

table (astro.table.BaseTable) – Table to potentially have their metadata changed

Return table:

Return the modified table

Return type:

astro.table.BaseTable

load_pandas_dataframe_to_table(source_dataframe, target_table, if_exists='replace', chunk_size=DEFAULT_CHUNK_SIZE)

Create a table with the dataframe’s contents. If the table already exists, append or replace the content, depending on the value of if_exists.

Parameters:
  • source_dataframe (pandas.DataFrame) – Local or remote filepath

  • target_table (astro.table.BaseTable) – Table in which the file will be loaded

  • if_exists (astro.constants.LoadExistStrategy) – Strategy to be used in case the target table already exists.

  • chunk_size (int) – Specify the number of rows in each batch to be written at a time.

Return type:

None

table_exists(table)

Check if a table exists in the database

Parameters:

table (astro.table.BaseTable) – Details of the table we want to check that exists

Return type:

bool

schema_exists(schema)

Checks if a schema exists in the database

Parameters:

schema – DB Schema - a namespace that contains named objects like (tables, functions, etc)

Return type:

bool

static get_table_qualified_name(table)

Return the table qualified name.

Parameters:

table (astro.table.BaseTable) – The table we want to retrieve the qualified name for.

Return type:

str

merge_table(source_table, target_table, source_to_target_columns_map, target_conflict_columns, if_conflicts='exception')

Merge the source table rows into a destination table. The argument if_conflicts allows the user to define how to handle conflicts.

Parameters:
  • source_table (astro.table.BaseTable) – Contains the rows to be merged to the target_table

  • target_table (astro.table.BaseTable) – Contains the destination table in which the rows will be merged

  • source_to_target_columns_map (dict[str, str]) – Dict of target_table columns names to source_table columns names

  • target_conflict_columns (list[str]) – List of cols where we expect to have a conflict while combining

  • if_conflicts (astro.constants.MergeConflictStrategy) – The strategy to be applied if there are conflicts.

Return type:

None

openlineage_dataset_name(table)

Returns the open lineage dataset name as per https://github.com/OpenLineage/OpenLineage/blob/main/spec/Naming.md Example: database.schema.table.name

Parameters:

table (astro.table.BaseTable) –

Return type:

str

openlineage_dataset_namespace()

Returns the open lineage dataset namespace as per https://github.com/OpenLineage/OpenLineage/blob/main/spec/Naming.md Example: mssql://localhost:3306

Return type:

str

openlineage_dataset_uri(table)

Returns the open lineage dataset uri as per https://github.com/OpenLineage/OpenLineage/blob/main/spec/Naming.md

Parameters:

table (astro.table.BaseTable) –

Return type:

str